88a4cc5262434bdeb446baaa33d4f1e66eb6d3f6
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "strings"
26         "sync"
27         "time"
28
29         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 type RESTSubscription struct {
39         Created          string
40         xAppServiceName  string
41         xAppRmrEndPoint  string
42         Meid             string
43         InstanceIds      []uint32
44         xAppIdToE2Id     map[int64]int64
45         SubReqOngoing    bool
46         SubDelReqOngoing bool
47         lastReqMd5sum    string
48 }
49
50 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
51
52         for _, v := range r.InstanceIds {
53                 if v == instanceId {
54                         return
55                 }
56
57         }
58
59         r.InstanceIds = append(r.InstanceIds, instanceId)
60 }
61
62 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
63         if md5sum != "" {
64                 r.lastReqMd5sum = md5sum
65         } else {
66                 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
67         }
68 }
69
70 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
71         r.InstanceIds = r.InstanceIds[1:]
72 }
73
74 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
75         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
76 }
77
78 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
79         return r.xAppIdToE2Id[xAppEventInstanceID]
80 }
81
82 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
83         delete(r.xAppIdToE2Id, xAppEventInstanceID)
84 }
85
86 func (r *RESTSubscription) SetProcessed(err error) {
87         r.SubReqOngoing = false
88         if err != nil {
89                 r.lastReqMd5sum = ""
90         }
91 }
92
93 type Registry struct {
94         mutex             *sync.Mutex
95         register          map[uint32]*Subscription
96         subIds            []uint32
97         rtmgrClient       *RtmgrClient
98         restSubscriptions map[string]*RESTSubscription
99 }
100
101 func (r *Registry) Initialize() {
102         r.mutex = new(sync.Mutex)
103         r.register = make(map[uint32]*Subscription)
104         r.restSubscriptions = make(map[string]*RESTSubscription)
105
106         var i uint32
107         for i = 1; i < 65535; i++ {
108                 r.subIds = append(r.subIds, i)
109         }
110 }
111
112 func (r *Registry) GetAllRestSubscriptionsJson() []byte {
113
114         r.mutex.Lock()
115         defer r.mutex.Unlock()
116         restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
117         if err != nil {
118                 xapp.Logger.Error("GetAllRestSubscriptions() json.Marshal error: %v", err)
119         }
120         return restSubscriptionsJson
121 }
122
123 func (r *Registry) GetAllE2NodeRestSubscriptionsJson(ranName string) []byte {
124
125         restSubscriptions := r.GetAllE2NodeRestSubscriptions(ranName)
126         e2NodeRestSubscriptionsJson, err := json.Marshal(restSubscriptions)
127         if err != nil {
128                 xapp.Logger.Error("GetE2NodeRestSubscriptions() json.Marshal error: %v", err)
129         }
130         return e2NodeRestSubscriptionsJson
131 }
132
133 func (r *Registry) GetAllE2NodeRestSubscriptions(ranName string) map[string]RESTSubscription {
134
135         r.mutex.Lock()
136         defer r.mutex.Unlock()
137         var restSubscriptions map[string]RESTSubscription
138         restSubscriptions = make(map[string]RESTSubscription)
139         for restSubsId, restSubscription := range r.restSubscriptions {
140                 if restSubscription.Meid == ranName {
141                         restSubscriptions[restSubsId] = *restSubscription
142                 }
143         }
144         return restSubscriptions
145 }
146
147 func (r *Registry) GetAllXappsJson() []byte {
148
149         r.mutex.Lock()
150         var xappList []string
151         var xappsMap map[string]string
152         xappsMap = make(map[string]string)
153         for _, restSubscription := range r.restSubscriptions {
154                 _, ok := xappsMap[restSubscription.xAppServiceName]
155                 if !ok {
156                         xappsMap[restSubscription.xAppServiceName] = restSubscription.xAppServiceName
157                         xappList = append(xappList, restSubscription.xAppServiceName)
158                 }
159         }
160         r.mutex.Unlock()
161
162         xappsJson, err := json.Marshal(xappList)
163         if err != nil {
164                 xapp.Logger.Error("GetXapps() json.Marshal error: %v", err)
165         }
166         return xappsJson
167 }
168
169 func (r *Registry) GetAllXapps() map[string]string {
170
171         r.mutex.Lock()
172         defer r.mutex.Unlock()
173         var xappsMap map[string]string
174         xappsMap = make(map[string]string)
175         for _, restSubscription := range r.restSubscriptions {
176                 _, ok := xappsMap[restSubscription.xAppServiceName]
177                 if !ok {
178                         xappsMap[restSubscription.xAppServiceName] = restSubscription.xAppServiceName
179                 }
180         }
181         return xappsMap
182 }
183
184 func (r *Registry) GetAllXappRestSubscriptionsJson(xAppServiceName string) []byte {
185
186         xappRestSubscriptions := r.GetAllXappRestSubscriptions(xAppServiceName)
187         xappRestSubscriptionsJson, err := json.Marshal(xappRestSubscriptions)
188         if err != nil {
189                 xapp.Logger.Error("GetXappRestSubscriptions() json.Marshal error: %v", err)
190         }
191         return xappRestSubscriptionsJson
192 }
193
194 func (r *Registry) GetAllXappRestSubscriptions(xAppServiceName string) map[string]RESTSubscription {
195
196         r.mutex.Lock()
197         defer r.mutex.Unlock()
198         var xappRestSubscriptions map[string]RESTSubscription
199         xappRestSubscriptions = make(map[string]RESTSubscription)
200         for restSubsId, xappRestSubscription := range r.restSubscriptions {
201                 if xappRestSubscription.xAppServiceName == xAppServiceName {
202                         xappRestSubscriptions[restSubsId] = *xappRestSubscription
203                 }
204         }
205         return xappRestSubscriptions
206 }
207
208 func (r *Registry) GetE2SubscriptionsJson(restSubsId string) ([]byte, error) {
209
210         // Get all E2 subscriptions of a REST subscription
211         restSubs, err := r.GetRESTSubscription(restSubsId, false)
212         if err != nil {
213                 return nil, err
214         }
215
216         r.mutex.Lock()
217         var e2Subscriptions []Subscription
218         for _, e2SubId := range restSubs.InstanceIds {
219                 e2Subscription, ok := r.register[e2SubId]
220                 if ok {
221                         e2Subscriptions = append(e2Subscriptions, *e2Subscription)
222                 }
223         }
224         r.mutex.Unlock()
225         e2SubscriptionsJson, err := json.Marshal(e2Subscriptions)
226         if err != nil {
227                 xapp.Logger.Error("GetE2Subscriptions() json.Marshal error: %v", err)
228         }
229         return e2SubscriptionsJson, nil
230 }
231
232 func (r *Registry) CreateRESTSubscription(restSubId *string, xappServiceName *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
233         r.mutex.Lock()
234         defer r.mutex.Unlock()
235         newRestSubscription := RESTSubscription{}
236         newRestSubscription.Created = time.Now().Format("2006-01-02 15:04:05.000")
237         newRestSubscription.xAppServiceName = *xappServiceName
238         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
239         newRestSubscription.Meid = *maid
240         newRestSubscription.SubReqOngoing = true
241         newRestSubscription.SubDelReqOngoing = false
242         r.restSubscriptions[*restSubId] = &newRestSubscription
243         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
244         xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
245         return &newRestSubscription
246 }
247
248 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
249         r.mutex.Lock()
250         defer r.mutex.Unlock()
251         delete(r.restSubscriptions, *restSubId)
252         xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
253 }
254
255 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
256         r.mutex.Lock()
257         defer r.mutex.Unlock()
258         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
259                 // Subscription deletion is not allowed if prosessing subscription request in not ready
260                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
261                         if IsDelReqOngoing == true {
262                                 restSubscription.SubDelReqOngoing = true
263                         }
264                         r.restSubscriptions[restSubId] = restSubscription
265                         return restSubscription, nil
266                 } else {
267                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
268                 }
269         }
270         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
271 }
272
273 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
274         r.mutex.Lock()
275         defer r.mutex.Unlock()
276
277         resp := models.SubscriptionList{}
278         for _, subs := range r.register {
279                 subs.mutex.Lock()
280                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
281                 subs.mutex.Unlock()
282         }
283         return resp, nil
284 }
285
286 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
287         if len(r.subIds) > 0 {
288                 subId := r.subIds[0]
289                 r.subIds = r.subIds[1:]
290                 if _, ok := r.register[subId]; ok == true {
291                         r.subIds = append(r.subIds, subId)
292                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
293                 }
294                 subs := &Subscription{
295                         registry:         r,
296                         Meid:             trans.Meid,
297                         RMRRouteCreated:  rmrRoutecreated,
298                         SubReqMsg:        subReqMsg,
299                         OngoingReqCount:  0,
300                         OngoingDelCount:  0,
301                         valid:            true,
302                         PolicyUpdate:     false,
303                         RetryFromXapp:    false,
304                         SubRespRcvd:      false,
305                         DeleteFromDb:     false,
306                         NoRespToXapp:     false,
307                         DoNotWaitSubResp: false,
308                 }
309                 subs.ReqId.Id = subReqMsg.RequestId.Id
310                 subs.ReqId.InstanceId = subId
311                 r.SetResetTestFlag(resetTestFlag, subs)
312
313                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
314                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
315                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
316                 }
317                 return subs, nil
318         }
319         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
320 }
321
322 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
323
324         for _, subs := range r.register {
325                 if subs.IsMergeable(trans, subReqMsg) {
326
327                         //
328                         // check if there has been race conditions
329                         //
330                         subs.mutex.Lock()
331                         //subs has been set to invalid
332                         if subs.valid == false {
333                                 subs.mutex.Unlock()
334                                 continue
335                         }
336                         // If size is zero, entry is to be deleted
337                         if subs.EpList.Size() == 0 {
338                                 subs.mutex.Unlock()
339                                 continue
340                         }
341                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
342                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
343                                 subs.mutex.Unlock()
344                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
345                                 return subs, true
346                         }
347                         subs.mutex.Unlock()
348
349                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
350                         return subs, false
351                 }
352         }
353         return nil, false
354 }
355
356 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
357         var err error
358         var newAlloc bool
359         errorInfo := ErrorInfo{}
360         r.mutex.Lock()
361         defer r.mutex.Unlock()
362
363         //
364         // Check validity of subscription action types
365         //
366         actionType, err := r.CheckActionTypes(subReqMsg)
367         if err != nil {
368                 xapp.Logger.Debug("CREATE %s", err)
369                 err = fmt.Errorf("E2 content validation failed")
370                 return nil, errorInfo, err
371         }
372
373         //
374         // Find possible existing Policy subscription
375         //
376         if actionType == e2ap.E2AP_ActionTypePolicy {
377                 if subs, ok := r.register[trans.GetSubId()]; ok {
378                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
379                         // Update message data to subscription
380                         subs.SubReqMsg = subReqMsg
381                         subs.PolicyUpdate = true
382                         subs.SetCachedResponse(nil, true)
383                         r.SetResetTestFlag(resetTestFlag, subs)
384                         return subs, errorInfo, nil
385                 }
386         }
387
388         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
389         if subs == nil {
390                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
391                         xapp.Logger.Error("%s", err.Error())
392                         err = fmt.Errorf("subscription not allocated")
393                         return nil, errorInfo, err
394                 }
395                 newAlloc = true
396         } else if endPointFound == true {
397                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
398                 subs.RetryFromXapp = true
399                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
400                 c.UpdateCounter(cDuplicateE2SubReq)
401                 return subs, errorInfo, nil
402         }
403
404         //
405         // Add to subscription
406         //
407         subs.mutex.Lock()
408         defer subs.mutex.Unlock()
409
410         epamount := subs.EpList.Size()
411         xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
412
413         r.mutex.Unlock()
414         //
415         // Subscription route updates
416         //
417         if createRMRRoute == true {
418                 if epamount == 1 {
419                         errorInfo, err = r.RouteCreate(subs, c)
420                 } else {
421                         errorInfo, err = r.RouteCreateUpdate(subs, c)
422                 }
423         } else {
424                 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
425         }
426         r.mutex.Lock()
427
428         if err != nil {
429                 if newAlloc {
430                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
431                 }
432                 // Delete already added endpoint for the request
433                 subs.EpList.DelEndpoint(trans.GetEndpoint())
434                 return nil, errorInfo, err
435         }
436
437         if newAlloc {
438                 r.register[subs.ReqId.InstanceId] = subs
439         }
440         xapp.Logger.Debug("CREATE %s", subs.String())
441         xapp.Logger.Debug("Registry: substable=%v", r.register)
442         return subs, errorInfo, nil
443 }
444
445 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
446         errorInfo := ErrorInfo{}
447         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
448         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
449         if err != nil {
450                 if strings.Contains(err.Error(), "status 400") {
451                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
452                 } else {
453                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
454                 }
455                 errorInfo.ErrorCause = err.Error()
456                 c.UpdateCounter(cRouteCreateFail)
457                 xapp.Logger.Error("%s", err.Error())
458                 err = fmt.Errorf("RTMGR route create failure")
459         }
460         return errorInfo, err
461 }
462
463 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
464         errorInfo := ErrorInfo{}
465         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
466         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
467         if err != nil {
468                 if strings.Contains(err.Error(), "status 400") {
469                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
470                 } else {
471                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
472                 }
473                 errorInfo.ErrorCause = err.Error()
474                 c.UpdateCounter(cRouteCreateUpdateFail)
475                 xapp.Logger.Error("%s", err.Error())
476                 err = fmt.Errorf("RTMGR route update failure")
477                 return errorInfo, err
478         }
479         c.UpdateCounter(cMergedSubscriptions)
480         return errorInfo, err
481 }
482
483 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
484         var reportFound bool = false
485         var policyFound bool = false
486         var insertFound bool = false
487
488         for _, acts := range subReqMsg.ActionSetups {
489                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
490                         reportFound = true
491                 }
492                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
493                         policyFound = true
494                 }
495                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
496                         insertFound = true
497                 }
498         }
499         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
500                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
501         }
502         if reportFound == true {
503                 return e2ap.E2AP_ActionTypeReport, nil
504         }
505         if policyFound == true {
506                 return e2ap.E2AP_ActionTypePolicy, nil
507         }
508         if insertFound == true {
509                 return e2ap.E2AP_ActionTypeInsert, nil
510         }
511         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
512 }
513
514 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
515
516         xapp.Logger.Debug("RemoveFromSubscription %s", idstring(nil, trans, subs, trans))
517         r.mutex.Lock()
518         defer r.mutex.Unlock()
519         subs.mutex.Lock()
520         defer subs.mutex.Unlock()
521
522         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
523         epamount := subs.EpList.Size()
524
525         subId := subs.ReqId.InstanceId
526         if delStatus == false {
527                 return nil
528         }
529
530         if waitRouteClean > 0 {
531                 // Wait here that response is delivered to xApp via RMR before route is cleaned
532                 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
533                 r.mutex.Unlock()
534                 time.Sleep(waitRouteClean)
535                 r.mutex.Lock()
536         }
537
538         xapp.Logger.Debug("CLEAN %s", subs.String())
539
540         if epamount == 0 {
541                 //
542                 // Subscription route delete
543                 //
544                 if subs.RMRRouteCreated == true {
545                         r.RouteDelete(subs, trans, c)
546                 }
547
548                 // Not merged subscription is being deleted
549                 xapp.Logger.Debug("Subscription route delete RemoveSubscriptionFromDb")
550                 c.RemoveSubscriptionFromDb(subs)
551
552                 //
553                 // Subscription release
554                 //
555
556                 if _, ok := r.register[subId]; ok {
557                         xapp.Logger.Debug("RELEASE %s", subs.String())
558                         delete(r.register, subId)
559                         xapp.Logger.Debug("Registry: substable=%v", r.register)
560                 }
561                 r.subIds = append(r.subIds, subId)
562         } else if subs.EpList.Size() > 0 {
563                 //
564                 // Subscription route update
565                 //
566                 if subs.RMRRouteCreated == true {
567                         r.RouteDeleteUpdate(subs, c)
568                 }
569
570                 // Endpoint of merged subscription is being deleted
571                 xapp.Logger.Debug("Subscription route update WriteSubscriptionToDb")
572                 err := c.WriteSubscriptionToDb(subs)
573                 if err != nil {
574                         xapp.Logger.Error("tracker.UnTrackTransaction() failed:%s", err.Error())
575                 }
576                 c.UpdateCounter(cUnmergedSubscriptions)
577         }
578         return nil
579 }
580
581 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
582         tmpList := xapp.RmrEndpointList{}
583         tmpList.AddEndpoint(trans.GetEndpoint())
584         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
585         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
586                 c.UpdateCounter(cRouteDeleteFail)
587         }
588 }
589
590 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
591         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
592         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
593                 c.UpdateCounter(cRouteDeleteUpdateFail)
594         }
595 }
596
597 func (r *Registry) GetSubscription(subId uint32) *Subscription {
598         r.mutex.Lock()
599         defer r.mutex.Unlock()
600         if _, ok := r.register[subId]; ok {
601                 return r.register[subId]
602         }
603         return nil
604 }
605
606 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
607         r.mutex.Lock()
608         defer r.mutex.Unlock()
609         for _, subId := range subIds {
610                 if _, ok := r.register[subId]; ok {
611                         return r.register[subId], nil
612                 }
613         }
614         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
615 }
616
617 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
618         if resetTestFlag == true {
619                 // This is used in submgr restart unit tests
620                 xapp.Logger.Debug("resetTestFlag == true")
621                 subs.DoNotWaitSubResp = true
622         } else {
623                 xapp.Logger.Debug("resetTestFlag == false")
624         }
625 }
626
627 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
628
629         xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
630         for subId, subs := range r.register {
631                 if subs.Meid.RanName == ranName {
632                         if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
633                                 // Subscription creation or deletion processes need to be processed gracefully till the end.
634                                 // Subscription is deleted at end of the process in both cases.
635                                 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
636                                 continue
637                         } else {
638                                 // Delete route
639                                 if subs.RMRRouteCreated == true {
640                                         for _, ep := range subs.EpList.Endpoints {
641                                                 tmpList := xapp.RmrEndpointList{}
642                                                 tmpList.AddEndpoint(&ep)
643                                                 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
644                                                 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
645                                                         c.UpdateCounter(cRouteDeleteFail)
646                                                 }
647                                         }
648                                 }
649                                 // Delete E2 subscription from registry and db
650                                 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
651                                 delete(r.register, subId)
652                                 r.subIds = append(r.subIds, subId)
653                                 c.RemoveSubscriptionFromDb(subs)
654                         }
655                 }
656         }
657
658         // Delete REST subscription from registry and db
659         for restSubId, restSubs := range r.restSubscriptions {
660                 if restSubs.Meid == ranName {
661                         if restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
662                                 // Subscription creation or deletion processes need to be processed gracefully till the end.
663                                 // Subscription is deleted at end of the process in both cases.
664                                 xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
665                                 continue
666                         } else {
667                                 xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
668                                 delete(r.restSubscriptions, restSubId)
669                                 c.RemoveRESTSubscriptionFromDb(restSubId)
670                         }
671                 }
672         }
673 }