E2 restart handling added
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "strings"
26         "sync"
27         "time"
28
29         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 type RESTSubscription struct {
39         xAppRmrEndPoint  string
40         Meid             string
41         InstanceIds      []uint32
42         xAppIdToE2Id     map[int64]int64
43         SubReqOngoing    bool
44         SubDelReqOngoing bool
45         lastReqMd5sum    string
46 }
47
48 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
49
50         for _, v := range r.InstanceIds {
51                 if v == instanceId {
52                         return
53                 }
54
55         }
56
57         r.InstanceIds = append(r.InstanceIds, instanceId)
58 }
59
60 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
61         if md5sum != "" {
62                 r.lastReqMd5sum = md5sum
63         } else {
64                 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
65         }
66 }
67
68 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
69         r.InstanceIds = r.InstanceIds[1:]
70 }
71
72 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
73         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
74 }
75
76 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
77         return r.xAppIdToE2Id[xAppEventInstanceID]
78 }
79
80 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
81         delete(r.xAppIdToE2Id, xAppEventInstanceID)
82 }
83
84 func (r *RESTSubscription) SetProcessed(err error) {
85         r.SubReqOngoing = false
86         if err != nil {
87                 r.lastReqMd5sum = ""
88         }
89 }
90
91 type Registry struct {
92         mutex             sync.Mutex
93         register          map[uint32]*Subscription
94         subIds            []uint32
95         rtmgrClient       *RtmgrClient
96         restSubscriptions map[string]*RESTSubscription
97 }
98
99 func (r *Registry) Initialize() {
100         r.register = make(map[uint32]*Subscription)
101         r.restSubscriptions = make(map[string]*RESTSubscription)
102
103         var i uint32
104         for i = 1; i < 65535; i++ {
105                 r.subIds = append(r.subIds, i)
106         }
107 }
108
109 func (r *Registry) GetAllRestSubscriptions() []byte {
110         r.mutex.Lock()
111         defer r.mutex.Unlock()
112         restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
113         if err != nil {
114                 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
115         }
116         return restSubscriptionsJson
117 }
118
119 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
120         r.mutex.Lock()
121         defer r.mutex.Unlock()
122         newRestSubscription := RESTSubscription{}
123         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
124         newRestSubscription.Meid = *maid
125         newRestSubscription.SubReqOngoing = true
126         newRestSubscription.SubDelReqOngoing = false
127         r.restSubscriptions[*restSubId] = &newRestSubscription
128         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
129         xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
130         return &newRestSubscription
131 }
132
133 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
134         r.mutex.Lock()
135         defer r.mutex.Unlock()
136         delete(r.restSubscriptions, *restSubId)
137         xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
138 }
139
140 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
141         r.mutex.Lock()
142         defer r.mutex.Unlock()
143         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
144                 // Subscription deletion is not allowed if prosessing subscription request in not ready
145                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
146                         if IsDelReqOngoing == true {
147                                 restSubscription.SubDelReqOngoing = true
148                         }
149                         r.restSubscriptions[restSubId] = restSubscription
150                         return restSubscription, nil
151                 } else {
152                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
153                 }
154         }
155         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
156 }
157
158 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
159         r.mutex.Lock()
160         defer r.mutex.Unlock()
161
162         resp := models.SubscriptionList{}
163         for _, subs := range r.register {
164                 subs.mutex.Lock()
165                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
166                 subs.mutex.Unlock()
167         }
168         return resp, nil
169 }
170
171 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
172         if len(r.subIds) > 0 {
173                 subId := r.subIds[0]
174                 r.subIds = r.subIds[1:]
175                 if _, ok := r.register[subId]; ok == true {
176                         r.subIds = append(r.subIds, subId)
177                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
178                 }
179                 subs := &Subscription{
180                         registry:         r,
181                         Meid:             trans.Meid,
182                         RMRRouteCreated:  rmrRoutecreated,
183                         SubReqMsg:        subReqMsg,
184                         OngoingReqCount:  0,
185                         OngoingDelCount:  0,
186                         valid:            true,
187                         PolicyUpdate:     false,
188                         RetryFromXapp:    false,
189                         SubRespRcvd:      false,
190                         DeleteFromDb:     false,
191                         NoRespToXapp:     false,
192                         DoNotWaitSubResp: false,
193                 }
194                 subs.ReqId.Id = subReqMsg.RequestId.Id
195                 subs.ReqId.InstanceId = subId
196                 r.SetResetTestFlag(resetTestFlag, subs)
197
198                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
199                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
200                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
201                 }
202                 return subs, nil
203         }
204         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
205 }
206
207 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
208
209         for _, subs := range r.register {
210                 if subs.IsMergeable(trans, subReqMsg) {
211
212                         //
213                         // check if there has been race conditions
214                         //
215                         subs.mutex.Lock()
216                         //subs has been set to invalid
217                         if subs.valid == false {
218                                 subs.mutex.Unlock()
219                                 continue
220                         }
221                         // If size is zero, entry is to be deleted
222                         if subs.EpList.Size() == 0 {
223                                 subs.mutex.Unlock()
224                                 continue
225                         }
226                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
227                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
228                                 subs.mutex.Unlock()
229                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
230                                 return subs, true
231                         }
232                         subs.mutex.Unlock()
233
234                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
235                         return subs, false
236                 }
237         }
238         return nil, false
239 }
240
241 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
242         var err error
243         var newAlloc bool
244         errorInfo := ErrorInfo{}
245         r.mutex.Lock()
246         defer r.mutex.Unlock()
247
248         //
249         // Check validity of subscription action types
250         //
251         actionType, err := r.CheckActionTypes(subReqMsg)
252         if err != nil {
253                 xapp.Logger.Debug("CREATE %s", err)
254                 err = fmt.Errorf("E2 content validation failed")
255                 return nil, errorInfo, err
256         }
257
258         //
259         // Find possible existing Policy subscription
260         //
261         if actionType == e2ap.E2AP_ActionTypePolicy {
262                 if subs, ok := r.register[trans.GetSubId()]; ok {
263                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
264                         // Update message data to subscription
265                         subs.SubReqMsg = subReqMsg
266                         subs.PolicyUpdate = true
267                         subs.SetCachedResponse(nil, true)
268                         r.SetResetTestFlag(resetTestFlag, subs)
269                         return subs, errorInfo, nil
270                 }
271         }
272
273         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
274         if subs == nil {
275                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
276                         xapp.Logger.Error("%s", err.Error())
277                         err = fmt.Errorf("subscription not allocated")
278                         return nil, errorInfo, err
279                 }
280                 newAlloc = true
281         } else if endPointFound == true {
282                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
283                 subs.RetryFromXapp = true
284                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
285                 c.UpdateCounter(cDuplicateE2SubReq)
286                 return subs, errorInfo, nil
287         }
288
289         //
290         // Add to subscription
291         //
292         subs.mutex.Lock()
293         defer subs.mutex.Unlock()
294
295         epamount := subs.EpList.Size()
296         xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
297
298         r.mutex.Unlock()
299         //
300         // Subscription route updates
301         //
302         if createRMRRoute == true {
303                 if epamount == 1 {
304                         errorInfo, err = r.RouteCreate(subs, c)
305                 } else {
306                         errorInfo, err = r.RouteCreateUpdate(subs, c)
307                 }
308         } else {
309                 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
310         }
311         r.mutex.Lock()
312
313         if err != nil {
314                 if newAlloc {
315                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
316                 }
317                 // Delete already added endpoint for the request
318                 subs.EpList.DelEndpoint(trans.GetEndpoint())
319                 return nil, errorInfo, err
320         }
321
322         if newAlloc {
323                 r.register[subs.ReqId.InstanceId] = subs
324         }
325         xapp.Logger.Debug("CREATE %s", subs.String())
326         xapp.Logger.Debug("Registry: substable=%v", r.register)
327         return subs, errorInfo, nil
328 }
329
330 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
331         errorInfo := ErrorInfo{}
332         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
333         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
334         if err != nil {
335                 if strings.Contains(err.Error(), "status 400") {
336                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
337                 } else {
338                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
339                 }
340                 errorInfo.ErrorCause = err.Error()
341                 c.UpdateCounter(cRouteCreateFail)
342                 xapp.Logger.Error("%s", err.Error())
343                 err = fmt.Errorf("RTMGR route create failure")
344         }
345         return errorInfo, err
346 }
347
348 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
349         errorInfo := ErrorInfo{}
350         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
351         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
352         if err != nil {
353                 if strings.Contains(err.Error(), "status 400") {
354                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
355                 } else {
356                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
357                 }
358                 errorInfo.ErrorCause = err.Error()
359                 c.UpdateCounter(cRouteCreateUpdateFail)
360                 xapp.Logger.Error("%s", err.Error())
361                 err = fmt.Errorf("RTMGR route update failure")
362                 return errorInfo, err
363         }
364         c.UpdateCounter(cMergedSubscriptions)
365         return errorInfo, err
366 }
367
368 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
369         var reportFound bool = false
370         var policyFound bool = false
371         var insertFound bool = false
372
373         for _, acts := range subReqMsg.ActionSetups {
374                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
375                         reportFound = true
376                 }
377                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
378                         policyFound = true
379                 }
380                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
381                         insertFound = true
382                 }
383         }
384         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
385                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
386         }
387         if reportFound == true {
388                 return e2ap.E2AP_ActionTypeReport, nil
389         }
390         if policyFound == true {
391                 return e2ap.E2AP_ActionTypePolicy, nil
392         }
393         if insertFound == true {
394                 return e2ap.E2AP_ActionTypeInsert, nil
395         }
396         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
397 }
398
399 // TODO: Works with concurrent calls, but check if can be improved
400 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
401
402         r.mutex.Lock()
403         defer r.mutex.Unlock()
404         subs.mutex.Lock()
405         defer subs.mutex.Unlock()
406
407         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
408         epamount := subs.EpList.Size()
409         subId := subs.ReqId.InstanceId
410         if delStatus == false {
411                 return nil
412         }
413
414         go func() {
415                 if waitRouteClean > 0 {
416                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
417                         time.Sleep(waitRouteClean)
418                 }
419
420                 subs.mutex.Lock()
421                 defer subs.mutex.Unlock()
422                 xapp.Logger.Debug("CLEAN %s", subs.String())
423
424                 if epamount == 0 {
425                         //
426                         // Subscription route delete
427                         //
428                         if subs.RMRRouteCreated == true {
429                                 r.RouteDelete(subs, trans, c)
430                         }
431
432                         //
433                         // Subscription release
434                         //
435                         r.mutex.Lock()
436                         defer r.mutex.Unlock()
437
438                         if _, ok := r.register[subId]; ok {
439                                 xapp.Logger.Debug("RELEASE %s", subs.String())
440                                 delete(r.register, subId)
441                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
442                         }
443                         r.subIds = append(r.subIds, subId)
444                 } else if subs.EpList.Size() > 0 {
445                         //
446                         // Subscription route update
447                         //
448                         if subs.RMRRouteCreated == true {
449                                 r.RouteDeleteUpdate(subs, c)
450                         }
451                 }
452         }()
453
454         return nil
455 }
456
457 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
458         tmpList := xapp.RmrEndpointList{}
459         tmpList.AddEndpoint(trans.GetEndpoint())
460         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
461         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
462                 c.UpdateCounter(cRouteDeleteFail)
463         }
464 }
465
466 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
467         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
468         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
469                 c.UpdateCounter(cRouteDeleteUpdateFail)
470         }
471 }
472
473 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
474         r.mutex.Lock()
475         defer r.mutex.Unlock()
476         subs.mutex.Lock()
477         defer subs.mutex.Unlock()
478
479         epamount := subs.EpList.Size()
480         if epamount == 0 {
481                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
482                         // Not merged subscription is being deleted
483                         c.RemoveSubscriptionFromDb(subs)
484
485                 }
486         } else if subs.EpList.Size() > 0 {
487                 // Endpoint of merged subscription is being deleted
488                 c.WriteSubscriptionToDb(subs)
489                 c.UpdateCounter(cUnmergedSubscriptions)
490         }
491 }
492
493 func (r *Registry) GetSubscription(subId uint32) *Subscription {
494         r.mutex.Lock()
495         defer r.mutex.Unlock()
496         if _, ok := r.register[subId]; ok {
497                 return r.register[subId]
498         }
499         return nil
500 }
501
502 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
503         r.mutex.Lock()
504         defer r.mutex.Unlock()
505         for _, subId := range subIds {
506                 if _, ok := r.register[subId]; ok {
507                         return r.register[subId], nil
508                 }
509         }
510         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
511 }
512
513 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
514         if resetTestFlag == true {
515                 // This is used in submgr restart unit tests
516                 xapp.Logger.Debug("resetTestFlag == true")
517                 subs.DoNotWaitSubResp = true
518         } else {
519                 xapp.Logger.Debug("resetTestFlag == false")
520         }
521 }
522
523 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
524
525         xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
526         for subId, subs := range r.register {
527                 if subs.Meid.RanName == ranName {
528                         if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
529                                 // Subscription creation or deletion processes need to be processed gracefully till the end.
530                                 // Subscription is deleted at end of the process in both cases.
531                                 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
532                                 continue
533                         } else {
534                                 // Delete route
535                                 if subs.RMRRouteCreated == true {
536                                         for _, ep := range subs.EpList.Endpoints {
537                                                 tmpList := xapp.RmrEndpointList{}
538                                                 tmpList.AddEndpoint(&ep)
539                                                 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
540                                                 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
541                                                         c.UpdateCounter(cRouteDeleteFail)
542                                                 }
543                                         }
544                                 }
545                                 // Delete E2 subscription from registry and db
546                                 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
547                                 delete(r.register, subId)
548                                 r.subIds = append(r.subIds, subId)
549                                 c.RemoveSubscriptionFromDb(subs)
550                         }
551                 }
552         }
553
554         // Delete REST subscription from registry and db
555         for restSubId, restSubs := range r.restSubscriptions {
556                 if restSubs.Meid == ranName && restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
557                         // Subscription creation or deletion processes need to be processed gracefully till the end.
558                         // Subscription is deleted at end of the process in both cases.
559                         xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
560                         continue
561                 } else {
562                         xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
563                         delete(r.restSubscriptions, restSubId)
564                         c.RemoveRESTSubscriptionFromDb(restSubId)
565                 }
566         }
567 }