2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 type RESTSubscription struct {
40 xAppServiceName string
41 xAppRmrEndPoint string
44 xAppIdToE2Id map[int64]int64
50 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
52 for _, v := range r.InstanceIds {
59 r.InstanceIds = append(r.InstanceIds, instanceId)
62 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
64 r.lastReqMd5sum = md5sum
66 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
70 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
71 r.InstanceIds = r.InstanceIds[1:]
74 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
75 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
78 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
79 return r.xAppIdToE2Id[xAppEventInstanceID]
82 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
83 delete(r.xAppIdToE2Id, xAppEventInstanceID)
86 func (r *RESTSubscription) SetProcessed(err error) {
87 r.SubReqOngoing = false
93 type Registry struct {
95 register map[uint32]*Subscription
97 rtmgrClient *RtmgrClient
98 restSubscriptions map[string]*RESTSubscription
101 func (r *Registry) Initialize() {
102 r.mutex = new(sync.Mutex)
103 r.register = make(map[uint32]*Subscription)
104 r.restSubscriptions = make(map[string]*RESTSubscription)
107 for i = 1; i < 65535; i++ {
108 r.subIds = append(r.subIds, i)
112 func (r *Registry) GetAllRestSubscriptionsJson() []byte {
115 defer r.mutex.Unlock()
116 restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
118 xapp.Logger.Error("GetAllRestSubscriptions() json.Marshal error: %v", err)
120 return restSubscriptionsJson
123 func (r *Registry) GetAllE2NodeRestSubscriptionsJson(ranName string) []byte {
125 restSubscriptions := r.GetAllE2NodeRestSubscriptions(ranName)
126 e2NodeRestSubscriptionsJson, err := json.Marshal(restSubscriptions)
128 xapp.Logger.Error("GetE2NodeRestSubscriptions() json.Marshal error: %v", err)
130 return e2NodeRestSubscriptionsJson
133 func (r *Registry) GetAllE2NodeRestSubscriptions(ranName string) map[string]RESTSubscription {
136 defer r.mutex.Unlock()
137 var restSubscriptions map[string]RESTSubscription
138 restSubscriptions = make(map[string]RESTSubscription)
139 for restSubsId, restSubscription := range r.restSubscriptions {
140 if restSubscription.Meid == ranName {
141 restSubscriptions[restSubsId] = *restSubscription
144 return restSubscriptions
147 func (r *Registry) GetAllXappsJson() []byte {
150 var xappList []string
151 var xappsMap map[string]string
152 xappsMap = make(map[string]string)
153 for _, restSubscription := range r.restSubscriptions {
154 _, ok := xappsMap[restSubscription.xAppServiceName]
156 xappsMap[restSubscription.xAppServiceName] = restSubscription.xAppServiceName
157 xappList = append(xappList, restSubscription.xAppServiceName)
162 xappsJson, err := json.Marshal(xappList)
164 xapp.Logger.Error("GetXapps() json.Marshal error: %v", err)
169 func (r *Registry) GetAllXapps() map[string]string {
172 defer r.mutex.Unlock()
173 var xappsMap map[string]string
174 xappsMap = make(map[string]string)
175 for _, restSubscription := range r.restSubscriptions {
176 _, ok := xappsMap[restSubscription.xAppServiceName]
178 xappsMap[restSubscription.xAppServiceName] = restSubscription.xAppServiceName
184 func (r *Registry) GetAllXappRestSubscriptionsJson(xAppServiceName string) []byte {
186 xappRestSubscriptions := r.GetAllXappRestSubscriptions(xAppServiceName)
187 xappRestSubscriptionsJson, err := json.Marshal(xappRestSubscriptions)
189 xapp.Logger.Error("GetXappRestSubscriptions() json.Marshal error: %v", err)
191 return xappRestSubscriptionsJson
194 func (r *Registry) GetAllXappRestSubscriptions(xAppServiceName string) map[string]RESTSubscription {
197 defer r.mutex.Unlock()
198 var xappRestSubscriptions map[string]RESTSubscription
199 xappRestSubscriptions = make(map[string]RESTSubscription)
200 for restSubsId, xappRestSubscription := range r.restSubscriptions {
201 if xappRestSubscription.xAppServiceName == xAppServiceName {
202 xappRestSubscriptions[restSubsId] = *xappRestSubscription
205 return xappRestSubscriptions
208 func (r *Registry) GetE2SubscriptionsJson(restSubsId string) ([]byte, error) {
210 // Get all E2 subscriptions of a REST subscription
211 restSubs, err := r.GetRESTSubscription(restSubsId, false)
217 var e2Subscriptions []Subscription
218 for _, e2SubId := range restSubs.InstanceIds {
219 e2Subscription, ok := r.register[e2SubId]
221 e2Subscriptions = append(e2Subscriptions, *e2Subscription)
225 e2SubscriptionsJson, err := json.Marshal(e2Subscriptions)
227 xapp.Logger.Error("GetE2Subscriptions() json.Marshal error: %v", err)
229 return e2SubscriptionsJson, nil
232 func (r *Registry) CreateRESTSubscription(restSubId *string, xappServiceName *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
234 defer r.mutex.Unlock()
235 newRestSubscription := RESTSubscription{}
236 newRestSubscription.Created = time.Now().Format("2006-01-02 15:04:05.000")
237 newRestSubscription.xAppServiceName = *xappServiceName
238 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
239 newRestSubscription.Meid = *maid
240 newRestSubscription.SubReqOngoing = true
241 newRestSubscription.SubDelReqOngoing = false
242 r.restSubscriptions[*restSubId] = &newRestSubscription
243 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
244 xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
245 return &newRestSubscription
248 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
250 defer r.mutex.Unlock()
251 delete(r.restSubscriptions, *restSubId)
252 xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
255 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
257 defer r.mutex.Unlock()
258 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
259 // Subscription deletion is not allowed if prosessing subscription request in not ready
260 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
261 if IsDelReqOngoing == true {
262 restSubscription.SubDelReqOngoing = true
264 r.restSubscriptions[restSubId] = restSubscription
265 return restSubscription, nil
267 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
270 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
273 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
275 defer r.mutex.Unlock()
277 resp := models.SubscriptionList{}
278 for _, subs := range r.register {
280 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
286 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
287 if len(r.subIds) > 0 {
289 r.subIds = r.subIds[1:]
290 if _, ok := r.register[subId]; ok == true {
291 r.subIds = append(r.subIds, subId)
292 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
294 subs := &Subscription{
297 RMRRouteCreated: rmrRoutecreated,
298 SubReqMsg: subReqMsg,
303 RetryFromXapp: false,
307 DoNotWaitSubResp: false,
309 subs.ReqId.Id = subReqMsg.RequestId.Id
310 subs.ReqId.InstanceId = subId
311 r.SetResetTestFlag(resetTestFlag, subs)
313 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
314 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
315 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
319 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
322 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
324 for _, subs := range r.register {
325 if subs.IsMergeable(trans, subReqMsg) {
328 // check if there has been race conditions
331 //subs has been set to invalid
332 if subs.valid == false {
336 // If size is zero, entry is to be deleted
337 if subs.EpList.Size() == 0 {
341 // Try to add to endpointlist. Adding fails if endpoint is already in the list
342 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
344 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
349 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
356 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
359 errorInfo := ErrorInfo{}
361 defer r.mutex.Unlock()
364 // Check validity of subscription action types
366 actionType, err := r.CheckActionTypes(subReqMsg)
368 xapp.Logger.Debug("CREATE %s", err)
369 err = fmt.Errorf("E2 content validation failed")
370 return nil, errorInfo, err
374 // Find possible existing Policy subscription
376 if actionType == e2ap.E2AP_ActionTypePolicy {
377 if subs, ok := r.register[trans.GetSubId()]; ok {
378 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
379 // Update message data to subscription
380 subs.SubReqMsg = subReqMsg
381 subs.PolicyUpdate = true
382 subs.SetCachedResponse(nil, true)
383 r.SetResetTestFlag(resetTestFlag, subs)
384 return subs, errorInfo, nil
388 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
390 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
391 xapp.Logger.Error("%s", err.Error())
392 err = fmt.Errorf("subscription not allocated")
393 return nil, errorInfo, err
396 } else if endPointFound == true {
397 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
398 subs.RetryFromXapp = true
399 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
400 c.UpdateCounter(cDuplicateE2SubReq)
401 return subs, errorInfo, nil
405 // Add to subscription
408 defer subs.mutex.Unlock()
410 epamount := subs.EpList.Size()
411 xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
415 // Subscription route updates
417 if createRMRRoute == true {
419 errorInfo, err = r.RouteCreate(subs, c)
421 errorInfo, err = r.RouteCreateUpdate(subs, c)
424 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
430 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
432 // Delete already added endpoint for the request
433 subs.EpList.DelEndpoint(trans.GetEndpoint())
434 return nil, errorInfo, err
438 r.register[subs.ReqId.InstanceId] = subs
440 xapp.Logger.Debug("CREATE %s", subs.String())
441 xapp.Logger.Debug("Registry: substable=%v", r.register)
442 return subs, errorInfo, nil
445 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
446 errorInfo := ErrorInfo{}
447 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
448 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
450 if strings.Contains(err.Error(), "status 400") {
451 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
453 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
455 errorInfo.ErrorCause = err.Error()
456 c.UpdateCounter(cRouteCreateFail)
457 xapp.Logger.Error("%s", err.Error())
458 err = fmt.Errorf("RTMGR route create failure")
460 return errorInfo, err
463 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
464 errorInfo := ErrorInfo{}
465 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
466 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
468 if strings.Contains(err.Error(), "status 400") {
469 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
471 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
473 errorInfo.ErrorCause = err.Error()
474 c.UpdateCounter(cRouteCreateUpdateFail)
475 xapp.Logger.Error("%s", err.Error())
476 err = fmt.Errorf("RTMGR route update failure")
477 return errorInfo, err
479 c.UpdateCounter(cMergedSubscriptions)
480 return errorInfo, err
483 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
484 var reportFound bool = false
485 var policyFound bool = false
486 var insertFound bool = false
488 for _, acts := range subReqMsg.ActionSetups {
489 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
492 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
495 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
499 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
500 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
502 if reportFound == true {
503 return e2ap.E2AP_ActionTypeReport, nil
505 if policyFound == true {
506 return e2ap.E2AP_ActionTypePolicy, nil
508 if insertFound == true {
509 return e2ap.E2AP_ActionTypeInsert, nil
511 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
514 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) {
516 xapp.Logger.Debug("RemoveFromSubscription %s", idstring(nil, trans, subs, trans))
518 defer r.mutex.Unlock()
520 defer subs.mutex.Unlock()
522 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
523 epamount := subs.EpList.Size()
525 subId := subs.ReqId.InstanceId
526 if delStatus == false {
530 if waitRouteClean > 0 {
531 // Wait here that response is delivered to xApp via RMR before route is cleaned
532 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
534 time.Sleep(waitRouteClean)
538 xapp.Logger.Debug("CLEAN %s", subs.String())
542 // Subscription route delete
544 if subs.RMRRouteCreated == true {
545 r.RouteDelete(subs, trans, c)
548 // Not merged subscription is being deleted
549 xapp.Logger.Debug("Subscription route delete RemoveSubscriptionFromDb")
550 c.RemoveSubscriptionFromDb(subs)
553 // Subscription release
556 if _, ok := r.register[subId]; ok {
557 xapp.Logger.Debug("RELEASE %s", subs.String())
558 delete(r.register, subId)
559 xapp.Logger.Debug("Registry: substable=%v", r.register)
561 r.subIds = append(r.subIds, subId)
562 } else if subs.EpList.Size() > 0 {
564 // Subscription route update
566 if subs.RMRRouteCreated == true {
567 r.RouteDeleteUpdate(subs, c)
570 // Endpoint of merged subscription is being deleted
571 xapp.Logger.Debug("Subscription route update WriteSubscriptionToDb")
572 err := c.WriteSubscriptionToDb(subs)
574 xapp.Logger.Error("tracker.UnTrackTransaction() failed:%s", err.Error())
576 c.UpdateCounter(cUnmergedSubscriptions)
581 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
582 tmpList := xapp.RmrEndpointList{}
583 tmpList.AddEndpoint(trans.GetEndpoint())
584 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
585 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
586 c.UpdateCounter(cRouteDeleteFail)
590 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
591 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
592 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
593 c.UpdateCounter(cRouteDeleteUpdateFail)
597 func (r *Registry) GetSubscription(subId uint32) *Subscription {
599 defer r.mutex.Unlock()
600 if _, ok := r.register[subId]; ok {
601 return r.register[subId]
606 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
608 defer r.mutex.Unlock()
609 for _, subId := range subIds {
610 if _, ok := r.register[subId]; ok {
611 return r.register[subId], nil
614 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
617 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
618 if resetTestFlag == true {
619 // This is used in submgr restart unit tests
620 xapp.Logger.Debug("resetTestFlag == true")
621 subs.DoNotWaitSubResp = true
623 xapp.Logger.Debug("resetTestFlag == false")
627 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
629 xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
630 for subId, subs := range r.register {
631 if subs.Meid.RanName == ranName {
632 if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
633 // Subscription creation or deletion processes need to be processed gracefully till the end.
634 // Subscription is deleted at end of the process in both cases.
635 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
639 if subs.RMRRouteCreated == true {
640 for _, ep := range subs.EpList.Endpoints {
641 tmpList := xapp.RmrEndpointList{}
642 tmpList.AddEndpoint(&ep)
643 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
644 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
645 c.UpdateCounter(cRouteDeleteFail)
649 // Delete E2 subscription from registry and db
650 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
651 delete(r.register, subId)
652 r.subIds = append(r.subIds, subId)
653 c.RemoveSubscriptionFromDb(subs)
658 // Delete REST subscription from registry and db
659 for restSubId, restSubs := range r.restSubscriptions {
660 if restSubs.Meid == ranName {
661 if restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
662 // Subscription creation or deletion processes need to be processed gracefully till the end.
663 // Subscription is deleted at end of the process in both cases.
664 xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
667 xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
668 delete(r.restSubscriptions, restSubId)
669 c.RemoveRESTSubscriptionFromDb(restSubId)