2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 type RESTSubscription struct {
39 xAppRmrEndPoint string
42 xAppIdToE2Id map[int64]int64
48 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
50 for _, v := range r.InstanceIds {
57 r.InstanceIds = append(r.InstanceIds, instanceId)
60 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
62 r.lastReqMd5sum = md5sum
64 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
68 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
69 r.InstanceIds = r.InstanceIds[1:]
72 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
73 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
76 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
77 return r.xAppIdToE2Id[xAppEventInstanceID]
80 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
81 delete(r.xAppIdToE2Id, xAppEventInstanceID)
84 func (r *RESTSubscription) SetProcessed(err error) {
85 r.SubReqOngoing = false
91 type Registry struct {
93 register map[uint32]*Subscription
95 rtmgrClient *RtmgrClient
96 restSubscriptions map[string]*RESTSubscription
99 func (r *Registry) Initialize() {
100 r.mutex = new(sync.Mutex)
101 r.register = make(map[uint32]*Subscription)
102 r.restSubscriptions = make(map[string]*RESTSubscription)
105 for i = 1; i < 65535; i++ {
106 r.subIds = append(r.subIds, i)
110 func (r *Registry) GetAllRestSubscriptions() []byte {
112 defer r.mutex.Unlock()
113 restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
115 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
117 return restSubscriptionsJson
120 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
122 defer r.mutex.Unlock()
123 newRestSubscription := RESTSubscription{}
124 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
125 newRestSubscription.Meid = *maid
126 newRestSubscription.SubReqOngoing = true
127 newRestSubscription.SubDelReqOngoing = false
128 r.restSubscriptions[*restSubId] = &newRestSubscription
129 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
130 xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
131 return &newRestSubscription
134 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
136 defer r.mutex.Unlock()
137 delete(r.restSubscriptions, *restSubId)
138 xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
141 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
143 defer r.mutex.Unlock()
144 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
145 // Subscription deletion is not allowed if prosessing subscription request in not ready
146 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
147 if IsDelReqOngoing == true {
148 restSubscription.SubDelReqOngoing = true
150 r.restSubscriptions[restSubId] = restSubscription
151 return restSubscription, nil
153 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
156 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
159 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
161 defer r.mutex.Unlock()
163 resp := models.SubscriptionList{}
164 for _, subs := range r.register {
166 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
172 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
173 if len(r.subIds) > 0 {
175 r.subIds = r.subIds[1:]
176 if _, ok := r.register[subId]; ok == true {
177 r.subIds = append(r.subIds, subId)
178 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
180 subs := &Subscription{
183 RMRRouteCreated: rmrRoutecreated,
184 SubReqMsg: subReqMsg,
189 RetryFromXapp: false,
193 DoNotWaitSubResp: false,
195 subs.ReqId.Id = subReqMsg.RequestId.Id
196 subs.ReqId.InstanceId = subId
197 r.SetResetTestFlag(resetTestFlag, subs)
199 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
200 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
201 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
205 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
208 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
210 for _, subs := range r.register {
211 if subs.IsMergeable(trans, subReqMsg) {
214 // check if there has been race conditions
217 //subs has been set to invalid
218 if subs.valid == false {
222 // If size is zero, entry is to be deleted
223 if subs.EpList.Size() == 0 {
227 // Try to add to endpointlist. Adding fails if endpoint is already in the list
228 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
230 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
235 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
242 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
245 errorInfo := ErrorInfo{}
247 defer r.mutex.Unlock()
250 // Check validity of subscription action types
252 actionType, err := r.CheckActionTypes(subReqMsg)
254 xapp.Logger.Debug("CREATE %s", err)
255 err = fmt.Errorf("E2 content validation failed")
256 return nil, errorInfo, err
260 // Find possible existing Policy subscription
262 if actionType == e2ap.E2AP_ActionTypePolicy {
263 if subs, ok := r.register[trans.GetSubId()]; ok {
264 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
265 // Update message data to subscription
266 subs.SubReqMsg = subReqMsg
267 subs.PolicyUpdate = true
268 subs.SetCachedResponse(nil, true)
269 r.SetResetTestFlag(resetTestFlag, subs)
270 return subs, errorInfo, nil
274 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
276 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
277 xapp.Logger.Error("%s", err.Error())
278 err = fmt.Errorf("subscription not allocated")
279 return nil, errorInfo, err
282 } else if endPointFound == true {
283 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
284 subs.RetryFromXapp = true
285 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
286 c.UpdateCounter(cDuplicateE2SubReq)
287 return subs, errorInfo, nil
291 // Add to subscription
294 defer subs.mutex.Unlock()
296 epamount := subs.EpList.Size()
297 xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
301 // Subscription route updates
303 if createRMRRoute == true {
305 errorInfo, err = r.RouteCreate(subs, c)
307 errorInfo, err = r.RouteCreateUpdate(subs, c)
310 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
316 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
318 // Delete already added endpoint for the request
319 subs.EpList.DelEndpoint(trans.GetEndpoint())
320 return nil, errorInfo, err
324 r.register[subs.ReqId.InstanceId] = subs
326 xapp.Logger.Debug("CREATE %s", subs.String())
327 xapp.Logger.Debug("Registry: substable=%v", r.register)
328 return subs, errorInfo, nil
331 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
332 errorInfo := ErrorInfo{}
333 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
334 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
336 if strings.Contains(err.Error(), "status 400") {
337 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
339 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
341 errorInfo.ErrorCause = err.Error()
342 c.UpdateCounter(cRouteCreateFail)
343 xapp.Logger.Error("%s", err.Error())
344 err = fmt.Errorf("RTMGR route create failure")
346 return errorInfo, err
349 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
350 errorInfo := ErrorInfo{}
351 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
352 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
354 if strings.Contains(err.Error(), "status 400") {
355 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
357 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
359 errorInfo.ErrorCause = err.Error()
360 c.UpdateCounter(cRouteCreateUpdateFail)
361 xapp.Logger.Error("%s", err.Error())
362 err = fmt.Errorf("RTMGR route update failure")
363 return errorInfo, err
365 c.UpdateCounter(cMergedSubscriptions)
366 return errorInfo, err
369 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
370 var reportFound bool = false
371 var policyFound bool = false
372 var insertFound bool = false
374 for _, acts := range subReqMsg.ActionSetups {
375 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
378 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
381 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
385 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
386 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
388 if reportFound == true {
389 return e2ap.E2AP_ActionTypeReport, nil
391 if policyFound == true {
392 return e2ap.E2AP_ActionTypePolicy, nil
394 if insertFound == true {
395 return e2ap.E2AP_ActionTypeInsert, nil
397 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
400 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
402 xapp.Logger.Debug("RemoveFromSubscription %s", idstring(nil, trans, subs, trans))
404 defer r.mutex.Unlock()
406 defer subs.mutex.Unlock()
408 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
409 epamount := subs.EpList.Size()
411 subId := subs.ReqId.InstanceId
412 if delStatus == false {
416 if waitRouteClean > 0 {
417 // Wait here that response is delivered to xApp via RMR before route is cleaned
418 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
419 time.Sleep(waitRouteClean)
422 xapp.Logger.Debug("CLEAN %s", subs.String())
426 // Subscription route delete
428 if subs.RMRRouteCreated == true {
429 r.RouteDelete(subs, trans, c)
432 // Not merged subscription is being deleted
433 xapp.Logger.Debug("Subscription route delete RemoveSubscriptionFromDb")
434 c.RemoveSubscriptionFromDb(subs)
437 // Subscription release
440 if _, ok := r.register[subId]; ok {
441 xapp.Logger.Debug("RELEASE %s", subs.String())
442 delete(r.register, subId)
443 xapp.Logger.Debug("Registry: substable=%v", r.register)
445 r.subIds = append(r.subIds, subId)
446 } else if subs.EpList.Size() > 0 {
448 // Subscription route update
450 if subs.RMRRouteCreated == true {
451 r.RouteDeleteUpdate(subs, c)
454 // Endpoint of merged subscription is being deleted
455 xapp.Logger.Debug("Subscription route update WriteSubscriptionToDb")
456 c.WriteSubscriptionToDb(subs)
457 c.UpdateCounter(cUnmergedSubscriptions)
462 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
463 tmpList := xapp.RmrEndpointList{}
464 tmpList.AddEndpoint(trans.GetEndpoint())
465 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
466 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
467 c.UpdateCounter(cRouteDeleteFail)
471 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
472 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
473 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
474 c.UpdateCounter(cRouteDeleteUpdateFail)
478 func (r *Registry) GetSubscription(subId uint32) *Subscription {
480 defer r.mutex.Unlock()
481 if _, ok := r.register[subId]; ok {
482 return r.register[subId]
487 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
489 defer r.mutex.Unlock()
490 for _, subId := range subIds {
491 if _, ok := r.register[subId]; ok {
492 return r.register[subId], nil
495 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
498 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
499 if resetTestFlag == true {
500 // This is used in submgr restart unit tests
501 xapp.Logger.Debug("resetTestFlag == true")
502 subs.DoNotWaitSubResp = true
504 xapp.Logger.Debug("resetTestFlag == false")
508 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
510 xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
511 for subId, subs := range r.register {
512 if subs.Meid.RanName == ranName {
513 if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
514 // Subscription creation or deletion processes need to be processed gracefully till the end.
515 // Subscription is deleted at end of the process in both cases.
516 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
520 if subs.RMRRouteCreated == true {
521 for _, ep := range subs.EpList.Endpoints {
522 tmpList := xapp.RmrEndpointList{}
523 tmpList.AddEndpoint(&ep)
524 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
525 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
526 c.UpdateCounter(cRouteDeleteFail)
530 // Delete E2 subscription from registry and db
531 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
532 delete(r.register, subId)
533 r.subIds = append(r.subIds, subId)
534 c.RemoveSubscriptionFromDb(subs)
539 // Delete REST subscription from registry and db
540 for restSubId, restSubs := range r.restSubscriptions {
541 if restSubs.Meid == ranName && restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
542 // Subscription creation or deletion processes need to be processed gracefully till the end.
543 // Subscription is deleted at end of the process in both cases.
544 xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
547 xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
548 delete(r.restSubscriptions, restSubId)
549 c.RemoveRESTSubscriptionFromDb(restSubId)