2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 type RESTSubscription struct {
39 xAppRmrEndPoint string
42 xAppIdToE2Id map[int64]int64
48 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
50 for _, v := range r.InstanceIds {
57 r.InstanceIds = append(r.InstanceIds, instanceId)
60 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
62 r.lastReqMd5sum = md5sum
64 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
68 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
69 r.InstanceIds = r.InstanceIds[1:]
72 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
73 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
76 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
77 return r.xAppIdToE2Id[xAppEventInstanceID]
80 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
81 delete(r.xAppIdToE2Id, xAppEventInstanceID)
84 func (r *RESTSubscription) SetProcessed(err error) {
85 r.SubReqOngoing = false
91 type Registry struct {
93 register map[uint32]*Subscription
95 rtmgrClient *RtmgrClient
96 restSubscriptions map[string]*RESTSubscription
99 func (r *Registry) Initialize() {
100 r.register = make(map[uint32]*Subscription)
101 r.restSubscriptions = make(map[string]*RESTSubscription)
104 for i = 1; i < 65535; i++ {
105 r.subIds = append(r.subIds, i)
109 func (r *Registry) GetAllRestSubscriptions() []byte {
111 defer r.mutex.Unlock()
112 restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
114 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
116 return restSubscriptionsJson
119 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
121 defer r.mutex.Unlock()
122 newRestSubscription := RESTSubscription{}
123 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
124 newRestSubscription.Meid = *maid
125 newRestSubscription.SubReqOngoing = true
126 newRestSubscription.SubDelReqOngoing = false
127 r.restSubscriptions[*restSubId] = &newRestSubscription
128 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
129 xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
130 return &newRestSubscription
133 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
135 defer r.mutex.Unlock()
136 delete(r.restSubscriptions, *restSubId)
137 xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
140 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
142 defer r.mutex.Unlock()
143 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
144 // Subscription deletion is not allowed if prosessing subscription request in not ready
145 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
146 if IsDelReqOngoing == true {
147 restSubscription.SubDelReqOngoing = true
149 r.restSubscriptions[restSubId] = restSubscription
150 return restSubscription, nil
152 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
155 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
158 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
160 defer r.mutex.Unlock()
162 resp := models.SubscriptionList{}
163 for _, subs := range r.register {
165 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
171 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
172 if len(r.subIds) > 0 {
174 r.subIds = r.subIds[1:]
175 if _, ok := r.register[subId]; ok == true {
176 r.subIds = append(r.subIds, subId)
177 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
179 subs := &Subscription{
182 RMRRouteCreated: rmrRoutecreated,
183 SubReqMsg: subReqMsg,
188 RetryFromXapp: false,
192 DoNotWaitSubResp: false,
194 subs.ReqId.Id = subReqMsg.RequestId.Id
195 subs.ReqId.InstanceId = subId
196 r.SetResetTestFlag(resetTestFlag, subs)
198 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
199 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
200 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
204 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
207 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
209 for _, subs := range r.register {
210 if subs.IsMergeable(trans, subReqMsg) {
213 // check if there has been race conditions
216 //subs has been set to invalid
217 if subs.valid == false {
221 // If size is zero, entry is to be deleted
222 if subs.EpList.Size() == 0 {
226 // Try to add to endpointlist. Adding fails if endpoint is already in the list
227 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
229 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
234 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
241 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
244 errorInfo := ErrorInfo{}
246 defer r.mutex.Unlock()
249 // Check validity of subscription action types
251 actionType, err := r.CheckActionTypes(subReqMsg)
253 xapp.Logger.Debug("CREATE %s", err)
254 err = fmt.Errorf("E2 content validation failed")
255 return nil, errorInfo, err
259 // Find possible existing Policy subscription
261 if actionType == e2ap.E2AP_ActionTypePolicy {
262 if subs, ok := r.register[trans.GetSubId()]; ok {
263 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
264 // Update message data to subscription
265 subs.SubReqMsg = subReqMsg
266 subs.PolicyUpdate = true
267 subs.SetCachedResponse(nil, true)
268 r.SetResetTestFlag(resetTestFlag, subs)
269 return subs, errorInfo, nil
273 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
275 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
276 xapp.Logger.Error("%s", err.Error())
277 err = fmt.Errorf("subscription not allocated")
278 return nil, errorInfo, err
281 } else if endPointFound == true {
282 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
283 subs.RetryFromXapp = true
284 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
285 c.UpdateCounter(cDuplicateE2SubReq)
286 return subs, errorInfo, nil
290 // Add to subscription
293 defer subs.mutex.Unlock()
295 epamount := subs.EpList.Size()
296 xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
300 // Subscription route updates
302 if createRMRRoute == true {
304 errorInfo, err = r.RouteCreate(subs, c)
306 errorInfo, err = r.RouteCreateUpdate(subs, c)
309 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
315 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
317 // Delete already added endpoint for the request
318 subs.EpList.DelEndpoint(trans.GetEndpoint())
319 return nil, errorInfo, err
323 r.register[subs.ReqId.InstanceId] = subs
325 xapp.Logger.Debug("CREATE %s", subs.String())
326 xapp.Logger.Debug("Registry: substable=%v", r.register)
327 return subs, errorInfo, nil
330 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
331 errorInfo := ErrorInfo{}
332 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
333 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
335 if strings.Contains(err.Error(), "status 400") {
336 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
338 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
340 errorInfo.ErrorCause = err.Error()
341 c.UpdateCounter(cRouteCreateFail)
342 xapp.Logger.Error("%s", err.Error())
343 err = fmt.Errorf("RTMGR route create failure")
345 return errorInfo, err
348 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
349 errorInfo := ErrorInfo{}
350 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
351 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
353 if strings.Contains(err.Error(), "status 400") {
354 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
356 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
358 errorInfo.ErrorCause = err.Error()
359 c.UpdateCounter(cRouteCreateUpdateFail)
360 xapp.Logger.Error("%s", err.Error())
361 err = fmt.Errorf("RTMGR route update failure")
362 return errorInfo, err
364 c.UpdateCounter(cMergedSubscriptions)
365 return errorInfo, err
368 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
369 var reportFound bool = false
370 var policyFound bool = false
371 var insertFound bool = false
373 for _, acts := range subReqMsg.ActionSetups {
374 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
377 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
380 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
384 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
385 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
387 if reportFound == true {
388 return e2ap.E2AP_ActionTypeReport, nil
390 if policyFound == true {
391 return e2ap.E2AP_ActionTypePolicy, nil
393 if insertFound == true {
394 return e2ap.E2AP_ActionTypeInsert, nil
396 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
399 // TODO: Works with concurrent calls, but check if can be improved
400 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
403 defer r.mutex.Unlock()
405 defer subs.mutex.Unlock()
407 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
408 epamount := subs.EpList.Size()
409 subId := subs.ReqId.InstanceId
410 if delStatus == false {
415 if waitRouteClean > 0 {
416 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
417 time.Sleep(waitRouteClean)
421 defer subs.mutex.Unlock()
422 xapp.Logger.Debug("CLEAN %s", subs.String())
426 // Subscription route delete
428 if subs.RMRRouteCreated == true {
429 r.RouteDelete(subs, trans, c)
433 // Subscription release
436 defer r.mutex.Unlock()
438 if _, ok := r.register[subId]; ok {
439 xapp.Logger.Debug("RELEASE %s", subs.String())
440 delete(r.register, subId)
441 xapp.Logger.Debug("Registry: substable=%v", r.register)
443 r.subIds = append(r.subIds, subId)
444 } else if subs.EpList.Size() > 0 {
446 // Subscription route update
448 if subs.RMRRouteCreated == true {
449 r.RouteDeleteUpdate(subs, c)
457 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
459 tmpList := xapp.RmrEndpointList{}
460 tmpList.AddEndpoint(trans.GetEndpoint())
461 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
462 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
463 c.UpdateCounter(cRouteDeleteFail)
467 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
468 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
469 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
470 c.UpdateCounter(cRouteDeleteUpdateFail)
474 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
476 defer r.mutex.Unlock()
478 defer subs.mutex.Unlock()
480 epamount := subs.EpList.Size()
482 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
483 // Not merged subscription is being deleted
484 c.RemoveSubscriptionFromDb(subs)
487 } else if subs.EpList.Size() > 0 {
488 // Endpoint of merged subscription is being deleted
489 c.WriteSubscriptionToDb(subs)
490 c.UpdateCounter(cUnmergedSubscriptions)
494 func (r *Registry) GetSubscription(subId uint32) *Subscription {
496 defer r.mutex.Unlock()
497 if _, ok := r.register[subId]; ok {
498 return r.register[subId]
503 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
505 defer r.mutex.Unlock()
506 for _, subId := range subIds {
507 if _, ok := r.register[subId]; ok {
508 return r.register[subId], nil
511 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
514 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
515 if resetTestFlag == true {
516 // This is used in submgr restart unit tests
517 xapp.Logger.Debug("resetTestFlag == true")
518 subs.DoNotWaitSubResp = true
520 xapp.Logger.Debug("resetTestFlag == false")
524 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
526 xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
527 for subId, subs := range r.register {
528 if subs.Meid.RanName == ranName {
529 if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
530 // Subscription creation or deletion processes need to be processed gracefully till the end.
531 // Subscription is deleted at end of the process in both cases.
532 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
536 if subs.RMRRouteCreated == true {
537 for _, ep := range subs.EpList.Endpoints {
538 tmpList := xapp.RmrEndpointList{}
539 tmpList.AddEndpoint(&ep)
540 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
541 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
542 c.UpdateCounter(cRouteDeleteFail)
546 // Delete E2 subscription from registry and db
547 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
548 delete(r.register, subId)
549 r.subIds = append(r.subIds, subId)
550 c.RemoveSubscriptionFromDb(subs)
555 // Delete REST subscription from registry and db
556 for restSubId, restSubs := range r.restSubscriptions {
557 if restSubs.Meid == ranName && restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
558 // Subscription creation or deletion processes need to be processed gracefully till the end.
559 // Subscription is deleted at end of the process in both cases.
560 xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
563 xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
564 delete(r.restSubscriptions, restSubId)
565 c.RemoveRESTSubscriptionFromDb(restSubId)