2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type Registry struct {
38 register map[uint32]*Subscription
40 rtmgrClient *RtmgrClient
43 func (r *Registry) Initialize() {
44 r.register = make(map[uint32]*Subscription)
46 for i = 0; i < 65535; i++ {
47 r.subIds = append(r.subIds, i+1)
51 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
53 defer r.mutex.Unlock()
55 resp := models.SubscriptionList{}
56 for _, subs := range r.register {
58 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.Seq), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
64 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
65 if len(r.subIds) > 0 {
66 sequenceNumber := r.subIds[0]
67 r.subIds = r.subIds[1:]
68 if _, ok := r.register[sequenceNumber]; ok == true {
69 r.subIds = append(r.subIds, sequenceNumber)
70 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
72 subs := &Subscription{
79 subs.ReqId.Seq = sequenceNumber
81 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
82 r.subIds = append(r.subIds, subs.ReqId.Seq)
83 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
88 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
91 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
93 for _, subs := range r.register {
94 if subs.IsMergeable(trans, subReqMsg) {
97 // check if there has been race conditions
100 //subs has been set to invalid
101 if subs.valid == false {
105 // If size is zero, entry is to be deleted
106 if subs.EpList.Size() == 0 {
110 // try to add to endpointlist.
111 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
117 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
124 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
128 defer r.mutex.Unlock()
131 // Check validity of subscription action types
133 actionType, err := r.CheckActionTypes(subReqMsg)
135 xapp.Logger.Debug("CREATE %s", err)
140 // Find possible existing Policy subscription
142 if actionType == e2ap.E2AP_ActionTypePolicy {
143 if subs, ok := r.register[trans.GetSubId()]; ok {
144 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
145 subs.SetCachedResponse(nil, true)
150 subs := r.findExistingSubs(trans, subReqMsg)
152 subs, err = r.allocateSubs(trans, subReqMsg)
160 // Add to subscription
163 defer subs.mutex.Unlock()
165 epamount := subs.EpList.Size()
169 // Subscription route updates
172 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
173 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
175 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
176 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
182 r.subIds = append(r.subIds, subs.ReqId.Seq)
188 r.register[subs.ReqId.Seq] = subs
190 xapp.Logger.Debug("CREATE %s", subs.String())
191 xapp.Logger.Debug("Registry: substable=%v", r.register)
195 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
196 var reportFound bool = false
197 var policyFound bool = false
199 for _, acts := range subReqMsg.ActionSetups {
200 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
203 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
207 if reportFound == true && policyFound == true {
208 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
210 if reportFound == true {
211 return e2ap.E2AP_ActionTypeReport, nil
213 if policyFound == true {
214 return e2ap.E2AP_ActionTypePolicy, nil
216 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
219 // TODO: Works with concurrent calls, but check if can be improved
220 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
223 defer r.mutex.Unlock()
225 defer subs.mutex.Unlock()
227 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
228 epamount := subs.EpList.Size()
229 seqId := subs.ReqId.Seq
231 if delStatus == false {
236 if waitRouteClean > 0 {
237 time.Sleep(waitRouteClean)
241 defer subs.mutex.Unlock()
242 xapp.Logger.Info("CLEAN %s", subs.String())
246 // Subscription route delete
248 tmpList := xapptweaks.RmrEndpointList{}
249 tmpList.AddEndpoint(trans.GetEndpoint())
250 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
251 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
254 // Subscription release
257 defer r.mutex.Unlock()
259 if _, ok := r.register[seqId]; ok {
260 xapp.Logger.Debug("RELEASE %s", subs.String())
261 delete(r.register, seqId)
262 xapp.Logger.Debug("Registry: substable=%v", r.register)
264 r.subIds = append(r.subIds, seqId)
266 } else if subs.EpList.Size() > 0 {
268 // Subscription route updates
270 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
271 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
279 func (r *Registry) GetSubscription(sn uint32) *Subscription {
281 defer r.mutex.Unlock()
282 if _, ok := r.register[sn]; ok {
283 return r.register[sn]
288 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
290 defer r.mutex.Unlock()
291 for _, id := range ids {
292 if _, ok := r.register[id]; ok {
293 return r.register[id], nil
296 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)