2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 type RESTSubscription struct {
40 xAppServiceName string
41 xAppRmrEndPoint string
44 xAppIdToE2Id map[int64]int64
50 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
52 for _, v := range r.InstanceIds {
59 r.InstanceIds = append(r.InstanceIds, instanceId)
62 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
64 r.lastReqMd5sum = md5sum
66 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
70 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
71 r.InstanceIds = r.InstanceIds[1:]
74 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
75 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
78 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
79 return r.xAppIdToE2Id[xAppEventInstanceID]
82 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
83 delete(r.xAppIdToE2Id, xAppEventInstanceID)
86 func (r *RESTSubscription) SetProcessed(err error) {
87 r.SubReqOngoing = false
93 type Registry struct {
95 register map[uint32]*Subscription
97 rtmgrClient *RtmgrClient
98 restSubscriptions map[string]*RESTSubscription
101 func (r *Registry) Initialize() {
102 r.mutex = new(sync.Mutex)
103 r.register = make(map[uint32]*Subscription)
104 r.restSubscriptions = make(map[string]*RESTSubscription)
107 for i = 1; i < 65535; i++ {
108 r.subIds = append(r.subIds, i)
112 func (r *Registry) GetAllRestSubscriptionsJson() []byte {
115 defer r.mutex.Unlock()
116 restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
118 xapp.Logger.Error("GetAllRestSubscriptions() json.Marshal error: %v", err)
120 return restSubscriptionsJson
123 func (r *Registry) GetAllE2NodeRestSubscriptionsJson(ranName string) []byte {
125 restSubscriptions := r.GetAllE2NodeRestSubscriptions(ranName)
126 e2NodeRestSubscriptionsJson, err := json.Marshal(restSubscriptions)
128 xapp.Logger.Error("GetE2NodeRestSubscriptions() json.Marshal error: %v", err)
130 return e2NodeRestSubscriptionsJson
133 func (r *Registry) GetAllE2NodeRestSubscriptions(ranName string) map[string]RESTSubscription {
136 defer r.mutex.Unlock()
137 var restSubscriptions map[string]RESTSubscription
138 restSubscriptions = make(map[string]RESTSubscription)
139 for restSubsId, restSubscription := range r.restSubscriptions {
140 if restSubscription.Meid == ranName {
141 restSubscriptions[restSubsId] = *restSubscription
144 return restSubscriptions
147 func (r *Registry) GetAllXappsJson() []byte {
150 var xappList []string
151 var xappsMap map[string]string
152 xappsMap = make(map[string]string)
153 for _, restSubscription := range r.restSubscriptions {
154 _, ok := xappsMap[restSubscription.xAppServiceName]
156 xappsMap[restSubscription.xAppServiceName] = restSubscription.xAppServiceName
157 xappList = append(xappList, restSubscription.xAppServiceName)
162 xappsJson, err := json.Marshal(xappList)
164 xapp.Logger.Error("GetXapps() json.Marshal error: %v", err)
169 func (r *Registry) GetAllXapps() map[string]string {
172 defer r.mutex.Unlock()
173 var xappsMap map[string]string
174 xappsMap = make(map[string]string)
175 for _, restSubscription := range r.restSubscriptions {
176 _, ok := xappsMap[restSubscription.xAppServiceName]
178 xappsMap[restSubscription.xAppServiceName] = restSubscription.xAppServiceName
184 func (r *Registry) GetAllXappRestSubscriptionsJson(xAppServiceName string) []byte {
186 xappRestSubscriptions := r.GetAllXappRestSubscriptions(xAppServiceName)
187 xappRestSubscriptionsJson, err := json.Marshal(xappRestSubscriptions)
189 xapp.Logger.Error("GetXappRestSubscriptions() json.Marshal error: %v", err)
191 return xappRestSubscriptionsJson
194 func (r *Registry) GetAllXappRestSubscriptions(xAppServiceName string) map[string]RESTSubscription {
197 defer r.mutex.Unlock()
198 var xappRestSubscriptions map[string]RESTSubscription
199 xappRestSubscriptions = make(map[string]RESTSubscription)
200 for restSubsId, xappRestSubscription := range r.restSubscriptions {
201 if xappRestSubscription.xAppServiceName == xAppServiceName {
202 xappRestSubscriptions[restSubsId] = *xappRestSubscription
205 return xappRestSubscriptions
208 func (r *Registry) GetE2SubscriptionsJson(restSubsId string) ([]byte, error) {
210 // Get all E2 subscriptions of a REST subscription
211 restSubs, err := r.GetRESTSubscription(restSubsId, false)
217 var e2Subscriptions []Subscription
218 for _, e2SubId := range restSubs.InstanceIds {
219 e2Subscription, ok := r.register[e2SubId]
221 e2Subscriptions = append(e2Subscriptions, *e2Subscription)
225 e2SubscriptionsJson, err := json.Marshal(e2Subscriptions)
227 xapp.Logger.Error("GetE2Subscriptions() json.Marshal error: %v", err)
229 return e2SubscriptionsJson, nil
232 func (r *Registry) CreateRESTSubscription(restSubId *string, xappServiceName *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
234 defer r.mutex.Unlock()
235 newRestSubscription := RESTSubscription{}
236 newRestSubscription.Created = time.Now()
237 newRestSubscription.xAppServiceName = *xappServiceName
238 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
239 newRestSubscription.Meid = *maid
240 newRestSubscription.SubReqOngoing = true
241 newRestSubscription.SubDelReqOngoing = false
242 r.restSubscriptions[*restSubId] = &newRestSubscription
243 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
244 xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
245 return &newRestSubscription
248 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
250 defer r.mutex.Unlock()
251 delete(r.restSubscriptions, *restSubId)
252 xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
255 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
257 defer r.mutex.Unlock()
258 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
259 // Subscription deletion is not allowed if prosessing subscription request in not ready
260 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
261 if IsDelReqOngoing == true {
262 restSubscription.SubDelReqOngoing = true
264 r.restSubscriptions[restSubId] = restSubscription
265 return restSubscription, nil
267 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
270 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
273 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
275 defer r.mutex.Unlock()
277 resp := models.SubscriptionList{}
278 for _, subs := range r.register {
280 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
286 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
287 if len(r.subIds) > 0 {
289 r.subIds = r.subIds[1:]
290 if _, ok := r.register[subId]; ok == true {
291 r.subIds = append(r.subIds, subId)
292 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
294 subs := &Subscription{
297 RMRRouteCreated: rmrRoutecreated,
298 SubReqMsg: subReqMsg,
303 RetryFromXapp: false,
307 DoNotWaitSubResp: false,
309 subs.ReqId.Id = subReqMsg.RequestId.Id
310 subs.ReqId.InstanceId = subId
311 r.SetResetTestFlag(resetTestFlag, subs)
313 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
314 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
315 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
319 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
322 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
324 for _, subs := range r.register {
325 if subs.IsMergeable(trans, subReqMsg) {
328 // check if there has been race conditions
331 //subs has been set to invalid
332 if subs.valid == false {
336 // If size is zero, entry is to be deleted
337 if subs.EpList.Size() == 0 {
341 // Try to add to endpointlist. Adding fails if endpoint is already in the list
342 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
344 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
349 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
356 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
359 errorInfo := ErrorInfo{}
361 defer r.mutex.Unlock()
364 // Check validity of subscription action types
366 actionType, err := r.CheckActionTypes(subReqMsg)
368 xapp.Logger.Debug("CREATE %s", err)
369 err = fmt.Errorf("E2 content validation failed")
370 return nil, errorInfo, err
374 // Find possible existing Policy subscription
376 if actionType == e2ap.E2AP_ActionTypePolicy {
377 if subs, ok := r.register[trans.GetSubId()]; ok {
378 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
379 // Update message data to subscription
380 subs.SubReqMsg = subReqMsg
381 subs.PolicyUpdate = true
382 subs.SetCachedResponse(nil, true)
383 r.SetResetTestFlag(resetTestFlag, subs)
384 return subs, errorInfo, nil
388 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
390 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
391 xapp.Logger.Error("%s", err.Error())
392 err = fmt.Errorf("subscription not allocated")
393 return nil, errorInfo, err
396 } else if endPointFound == true {
397 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
398 subs.RetryFromXapp = true
399 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
400 c.UpdateCounter(cDuplicateE2SubReq)
401 return subs, errorInfo, nil
405 // Add to subscription
408 defer subs.mutex.Unlock()
410 epamount := subs.EpList.Size()
411 xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
415 // Subscription route updates
417 if createRMRRoute == true {
419 errorInfo, err = r.RouteCreate(subs, c)
421 errorInfo, err = r.RouteCreateUpdate(subs, c)
424 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
430 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
432 // Delete already added endpoint for the request
433 subs.EpList.DelEndpoint(trans.GetEndpoint())
434 return nil, errorInfo, err
438 r.register[subs.ReqId.InstanceId] = subs
440 xapp.Logger.Debug("CREATE %s", subs.String())
441 xapp.Logger.Debug("Registry: substable=%v", r.register)
442 return subs, errorInfo, nil
445 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
446 errorInfo := ErrorInfo{}
447 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
448 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
450 if strings.Contains(err.Error(), "status 400") {
451 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
453 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
455 errorInfo.ErrorCause = err.Error()
456 c.UpdateCounter(cRouteCreateFail)
457 xapp.Logger.Error("%s", err.Error())
458 err = fmt.Errorf("RTMGR route create failure")
460 return errorInfo, err
463 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
464 errorInfo := ErrorInfo{}
465 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
466 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
468 if strings.Contains(err.Error(), "status 400") {
469 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
471 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
473 errorInfo.ErrorCause = err.Error()
474 c.UpdateCounter(cRouteCreateUpdateFail)
475 xapp.Logger.Error("%s", err.Error())
476 err = fmt.Errorf("RTMGR route update failure")
477 return errorInfo, err
479 c.UpdateCounter(cMergedSubscriptions)
480 return errorInfo, err
483 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
484 var reportFound bool = false
485 var policyFound bool = false
486 var insertFound bool = false
488 for _, acts := range subReqMsg.ActionSetups {
489 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
492 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
495 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
499 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
500 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
502 if reportFound == true {
503 return e2ap.E2AP_ActionTypeReport, nil
505 if policyFound == true {
506 return e2ap.E2AP_ActionTypePolicy, nil
508 if insertFound == true {
509 return e2ap.E2AP_ActionTypeInsert, nil
511 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
514 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
516 xapp.Logger.Debug("RemoveFromSubscription %s", idstring(nil, trans, subs, trans))
518 defer r.mutex.Unlock()
520 defer subs.mutex.Unlock()
522 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
523 epamount := subs.EpList.Size()
525 subId := subs.ReqId.InstanceId
526 if delStatus == false {
530 if waitRouteClean > 0 {
531 // Wait here that response is delivered to xApp via RMR before route is cleaned
532 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
533 time.Sleep(waitRouteClean)
536 xapp.Logger.Debug("CLEAN %s", subs.String())
540 // Subscription route delete
542 if subs.RMRRouteCreated == true {
543 r.RouteDelete(subs, trans, c)
546 // Not merged subscription is being deleted
547 xapp.Logger.Debug("Subscription route delete RemoveSubscriptionFromDb")
548 c.RemoveSubscriptionFromDb(subs)
551 // Subscription release
554 if _, ok := r.register[subId]; ok {
555 xapp.Logger.Debug("RELEASE %s", subs.String())
556 delete(r.register, subId)
557 xapp.Logger.Debug("Registry: substable=%v", r.register)
559 r.subIds = append(r.subIds, subId)
560 } else if subs.EpList.Size() > 0 {
562 // Subscription route update
564 if subs.RMRRouteCreated == true {
565 r.RouteDeleteUpdate(subs, c)
568 // Endpoint of merged subscription is being deleted
569 xapp.Logger.Debug("Subscription route update WriteSubscriptionToDb")
570 c.WriteSubscriptionToDb(subs)
571 c.UpdateCounter(cUnmergedSubscriptions)
576 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
577 tmpList := xapp.RmrEndpointList{}
578 tmpList.AddEndpoint(trans.GetEndpoint())
579 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
580 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
581 c.UpdateCounter(cRouteDeleteFail)
585 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
586 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
587 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
588 c.UpdateCounter(cRouteDeleteUpdateFail)
592 func (r *Registry) GetSubscription(subId uint32) *Subscription {
594 defer r.mutex.Unlock()
595 if _, ok := r.register[subId]; ok {
596 return r.register[subId]
601 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
603 defer r.mutex.Unlock()
604 for _, subId := range subIds {
605 if _, ok := r.register[subId]; ok {
606 return r.register[subId], nil
609 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
612 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
613 if resetTestFlag == true {
614 // This is used in submgr restart unit tests
615 xapp.Logger.Debug("resetTestFlag == true")
616 subs.DoNotWaitSubResp = true
618 xapp.Logger.Debug("resetTestFlag == false")
622 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
624 xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
625 for subId, subs := range r.register {
626 if subs.Meid.RanName == ranName {
627 if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
628 // Subscription creation or deletion processes need to be processed gracefully till the end.
629 // Subscription is deleted at end of the process in both cases.
630 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
634 if subs.RMRRouteCreated == true {
635 for _, ep := range subs.EpList.Endpoints {
636 tmpList := xapp.RmrEndpointList{}
637 tmpList.AddEndpoint(&ep)
638 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
639 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
640 c.UpdateCounter(cRouteDeleteFail)
644 // Delete E2 subscription from registry and db
645 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
646 delete(r.register, subId)
647 r.subIds = append(r.subIds, subId)
648 c.RemoveSubscriptionFromDb(subs)
653 // Delete REST subscription from registry and db
654 for restSubId, restSubs := range r.restSubscriptions {
655 if restSubs.Meid == ranName && restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
656 // Subscription creation or deletion processes need to be processed gracefully till the end.
657 // Subscription is deleted at end of the process in both cases.
658 xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
661 xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
662 delete(r.restSubscriptions, restSubId)
663 c.RemoveRESTSubscriptionFromDb(restSubId)