Fixes for subscription merge release cases
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "strings"
26         "sync"
27         "time"
28
29         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 type RESTSubscription struct {
39         xAppRmrEndPoint  string
40         Meid             string
41         InstanceIds      []uint32
42         xAppIdToE2Id     map[int64]int64
43         SubReqOngoing    bool
44         SubDelReqOngoing bool
45         lastReqMd5sum    string
46 }
47
48 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
49
50         for _, v := range r.InstanceIds {
51                 if v == instanceId {
52                         return
53                 }
54
55         }
56
57         r.InstanceIds = append(r.InstanceIds, instanceId)
58 }
59
60 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
61         if md5sum != "" {
62                 r.lastReqMd5sum = md5sum
63         } else {
64                 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
65         }
66 }
67
68 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
69         r.InstanceIds = r.InstanceIds[1:]
70 }
71
72 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
73         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
74 }
75
76 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
77         return r.xAppIdToE2Id[xAppEventInstanceID]
78 }
79
80 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
81         delete(r.xAppIdToE2Id, xAppEventInstanceID)
82 }
83
84 func (r *RESTSubscription) SetProcessed(err error) {
85         r.SubReqOngoing = false
86         if err != nil {
87                 r.lastReqMd5sum = ""
88         }
89 }
90
91 type Registry struct {
92         mutex             *sync.Mutex
93         register          map[uint32]*Subscription
94         subIds            []uint32
95         rtmgrClient       *RtmgrClient
96         restSubscriptions map[string]*RESTSubscription
97 }
98
99 func (r *Registry) Initialize() {
100         r.mutex = new(sync.Mutex)
101         r.register = make(map[uint32]*Subscription)
102         r.restSubscriptions = make(map[string]*RESTSubscription)
103
104         var i uint32
105         for i = 1; i < 65535; i++ {
106                 r.subIds = append(r.subIds, i)
107         }
108 }
109
110 func (r *Registry) GetAllRestSubscriptions() []byte {
111         r.mutex.Lock()
112         defer r.mutex.Unlock()
113         restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
114         if err != nil {
115                 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
116         }
117         return restSubscriptionsJson
118 }
119
120 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
121         r.mutex.Lock()
122         defer r.mutex.Unlock()
123         newRestSubscription := RESTSubscription{}
124         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
125         newRestSubscription.Meid = *maid
126         newRestSubscription.SubReqOngoing = true
127         newRestSubscription.SubDelReqOngoing = false
128         r.restSubscriptions[*restSubId] = &newRestSubscription
129         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
130         xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
131         return &newRestSubscription
132 }
133
134 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
135         r.mutex.Lock()
136         defer r.mutex.Unlock()
137         delete(r.restSubscriptions, *restSubId)
138         xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
139 }
140
141 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
142         r.mutex.Lock()
143         defer r.mutex.Unlock()
144         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
145                 // Subscription deletion is not allowed if prosessing subscription request in not ready
146                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
147                         if IsDelReqOngoing == true {
148                                 restSubscription.SubDelReqOngoing = true
149                         }
150                         r.restSubscriptions[restSubId] = restSubscription
151                         return restSubscription, nil
152                 } else {
153                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
154                 }
155         }
156         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
157 }
158
159 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
160         r.mutex.Lock()
161         defer r.mutex.Unlock()
162
163         resp := models.SubscriptionList{}
164         for _, subs := range r.register {
165                 subs.mutex.Lock()
166                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
167                 subs.mutex.Unlock()
168         }
169         return resp, nil
170 }
171
172 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
173         if len(r.subIds) > 0 {
174                 subId := r.subIds[0]
175                 r.subIds = r.subIds[1:]
176                 if _, ok := r.register[subId]; ok == true {
177                         r.subIds = append(r.subIds, subId)
178                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
179                 }
180                 subs := &Subscription{
181                         registry:         r,
182                         Meid:             trans.Meid,
183                         RMRRouteCreated:  rmrRoutecreated,
184                         SubReqMsg:        subReqMsg,
185                         OngoingReqCount:  0,
186                         OngoingDelCount:  0,
187                         valid:            true,
188                         PolicyUpdate:     false,
189                         RetryFromXapp:    false,
190                         SubRespRcvd:      false,
191                         DeleteFromDb:     false,
192                         NoRespToXapp:     false,
193                         DoNotWaitSubResp: false,
194                 }
195                 subs.ReqId.Id = subReqMsg.RequestId.Id
196                 subs.ReqId.InstanceId = subId
197                 r.SetResetTestFlag(resetTestFlag, subs)
198
199                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
200                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
201                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
202                 }
203                 return subs, nil
204         }
205         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
206 }
207
208 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
209
210         for _, subs := range r.register {
211                 if subs.IsMergeable(trans, subReqMsg) {
212
213                         //
214                         // check if there has been race conditions
215                         //
216                         subs.mutex.Lock()
217                         //subs has been set to invalid
218                         if subs.valid == false {
219                                 subs.mutex.Unlock()
220                                 continue
221                         }
222                         // If size is zero, entry is to be deleted
223                         if subs.EpList.Size() == 0 {
224                                 subs.mutex.Unlock()
225                                 continue
226                         }
227                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
228                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
229                                 subs.mutex.Unlock()
230                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
231                                 return subs, true
232                         }
233                         subs.mutex.Unlock()
234
235                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
236                         return subs, false
237                 }
238         }
239         return nil, false
240 }
241
242 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
243         var err error
244         var newAlloc bool
245         errorInfo := ErrorInfo{}
246         r.mutex.Lock()
247         defer r.mutex.Unlock()
248
249         //
250         // Check validity of subscription action types
251         //
252         actionType, err := r.CheckActionTypes(subReqMsg)
253         if err != nil {
254                 xapp.Logger.Debug("CREATE %s", err)
255                 err = fmt.Errorf("E2 content validation failed")
256                 return nil, errorInfo, err
257         }
258
259         //
260         // Find possible existing Policy subscription
261         //
262         if actionType == e2ap.E2AP_ActionTypePolicy {
263                 if subs, ok := r.register[trans.GetSubId()]; ok {
264                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
265                         // Update message data to subscription
266                         subs.SubReqMsg = subReqMsg
267                         subs.PolicyUpdate = true
268                         subs.SetCachedResponse(nil, true)
269                         r.SetResetTestFlag(resetTestFlag, subs)
270                         return subs, errorInfo, nil
271                 }
272         }
273
274         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
275         if subs == nil {
276                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
277                         xapp.Logger.Error("%s", err.Error())
278                         err = fmt.Errorf("subscription not allocated")
279                         return nil, errorInfo, err
280                 }
281                 newAlloc = true
282         } else if endPointFound == true {
283                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
284                 subs.RetryFromXapp = true
285                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
286                 c.UpdateCounter(cDuplicateE2SubReq)
287                 return subs, errorInfo, nil
288         }
289
290         //
291         // Add to subscription
292         //
293         subs.mutex.Lock()
294         defer subs.mutex.Unlock()
295
296         epamount := subs.EpList.Size()
297         xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
298
299         r.mutex.Unlock()
300         //
301         // Subscription route updates
302         //
303         if createRMRRoute == true {
304                 if epamount == 1 {
305                         errorInfo, err = r.RouteCreate(subs, c)
306                 } else {
307                         errorInfo, err = r.RouteCreateUpdate(subs, c)
308                 }
309         } else {
310                 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
311         }
312         r.mutex.Lock()
313
314         if err != nil {
315                 if newAlloc {
316                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
317                 }
318                 // Delete already added endpoint for the request
319                 subs.EpList.DelEndpoint(trans.GetEndpoint())
320                 return nil, errorInfo, err
321         }
322
323         if newAlloc {
324                 r.register[subs.ReqId.InstanceId] = subs
325         }
326         xapp.Logger.Debug("CREATE %s", subs.String())
327         xapp.Logger.Debug("Registry: substable=%v", r.register)
328         return subs, errorInfo, nil
329 }
330
331 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
332         errorInfo := ErrorInfo{}
333         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
334         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
335         if err != nil {
336                 if strings.Contains(err.Error(), "status 400") {
337                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
338                 } else {
339                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
340                 }
341                 errorInfo.ErrorCause = err.Error()
342                 c.UpdateCounter(cRouteCreateFail)
343                 xapp.Logger.Error("%s", err.Error())
344                 err = fmt.Errorf("RTMGR route create failure")
345         }
346         return errorInfo, err
347 }
348
349 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
350         errorInfo := ErrorInfo{}
351         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
352         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
353         if err != nil {
354                 if strings.Contains(err.Error(), "status 400") {
355                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
356                 } else {
357                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
358                 }
359                 errorInfo.ErrorCause = err.Error()
360                 c.UpdateCounter(cRouteCreateUpdateFail)
361                 xapp.Logger.Error("%s", err.Error())
362                 err = fmt.Errorf("RTMGR route update failure")
363                 return errorInfo, err
364         }
365         c.UpdateCounter(cMergedSubscriptions)
366         return errorInfo, err
367 }
368
369 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
370         var reportFound bool = false
371         var policyFound bool = false
372         var insertFound bool = false
373
374         for _, acts := range subReqMsg.ActionSetups {
375                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
376                         reportFound = true
377                 }
378                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
379                         policyFound = true
380                 }
381                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
382                         insertFound = true
383                 }
384         }
385         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
386                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
387         }
388         if reportFound == true {
389                 return e2ap.E2AP_ActionTypeReport, nil
390         }
391         if policyFound == true {
392                 return e2ap.E2AP_ActionTypePolicy, nil
393         }
394         if insertFound == true {
395                 return e2ap.E2AP_ActionTypeInsert, nil
396         }
397         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
398 }
399
400 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
401
402         xapp.Logger.Debug("RemoveFromSubscription %s", idstring(nil, trans, subs, trans))
403         r.mutex.Lock()
404         defer r.mutex.Unlock()
405         subs.mutex.Lock()
406         defer subs.mutex.Unlock()
407
408         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
409         epamount := subs.EpList.Size()
410
411         subId := subs.ReqId.InstanceId
412         if delStatus == false {
413                 return nil
414         }
415
416         if waitRouteClean > 0 {
417                 // Wait here that response is delivered to xApp via RMR before route is cleaned
418                 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
419                 time.Sleep(waitRouteClean)
420         }
421
422         xapp.Logger.Debug("CLEAN %s", subs.String())
423
424         if epamount == 0 {
425                 //
426                 // Subscription route delete
427                 //
428                 if subs.RMRRouteCreated == true {
429                         r.RouteDelete(subs, trans, c)
430                 }
431
432                 // Not merged subscription is being deleted
433                 xapp.Logger.Debug("Subscription route delete RemoveSubscriptionFromDb")
434                 c.RemoveSubscriptionFromDb(subs)
435
436                 //
437                 // Subscription release
438                 //
439
440                 if _, ok := r.register[subId]; ok {
441                         xapp.Logger.Debug("RELEASE %s", subs.String())
442                         delete(r.register, subId)
443                         xapp.Logger.Debug("Registry: substable=%v", r.register)
444                 }
445                 r.subIds = append(r.subIds, subId)
446         } else if subs.EpList.Size() > 0 {
447                 //
448                 // Subscription route update
449                 //
450                 if subs.RMRRouteCreated == true {
451                         r.RouteDeleteUpdate(subs, c)
452                 }
453
454                 // Endpoint of merged subscription is being deleted
455                 xapp.Logger.Debug("Subscription route update WriteSubscriptionToDb")
456                 c.WriteSubscriptionToDb(subs)
457                 c.UpdateCounter(cUnmergedSubscriptions)
458         }
459         return nil
460 }
461
462 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
463         tmpList := xapp.RmrEndpointList{}
464         tmpList.AddEndpoint(trans.GetEndpoint())
465         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
466         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
467                 c.UpdateCounter(cRouteDeleteFail)
468         }
469 }
470
471 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
472         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
473         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
474                 c.UpdateCounter(cRouteDeleteUpdateFail)
475         }
476 }
477
478 func (r *Registry) GetSubscription(subId uint32) *Subscription {
479         r.mutex.Lock()
480         defer r.mutex.Unlock()
481         if _, ok := r.register[subId]; ok {
482                 return r.register[subId]
483         }
484         return nil
485 }
486
487 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
488         r.mutex.Lock()
489         defer r.mutex.Unlock()
490         for _, subId := range subIds {
491                 if _, ok := r.register[subId]; ok {
492                         return r.register[subId], nil
493                 }
494         }
495         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
496 }
497
498 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
499         if resetTestFlag == true {
500                 // This is used in submgr restart unit tests
501                 xapp.Logger.Debug("resetTestFlag == true")
502                 subs.DoNotWaitSubResp = true
503         } else {
504                 xapp.Logger.Debug("resetTestFlag == false")
505         }
506 }
507
508 func (r *Registry) DeleteAllE2Subscriptions(ranName string, c *Control) {
509
510         xapp.Logger.Debug("Registry: DeleteAllE2Subscriptions()")
511         for subId, subs := range r.register {
512                 if subs.Meid.RanName == ranName {
513                         if subs.OngoingReqCount != 0 || subs.OngoingDelCount != 0 {
514                                 // Subscription creation or deletion processes need to be processed gracefully till the end.
515                                 // Subscription is deleted at end of the process in both cases.
516                                 xapp.Logger.Debug("Registry: E2 subscription under prosessing ongoing cannot delete it yet. subId=%v, OngoingReqCount=%v, OngoingDelCount=%v", subId, subs.OngoingReqCount, subs.OngoingDelCount)
517                                 continue
518                         } else {
519                                 // Delete route
520                                 if subs.RMRRouteCreated == true {
521                                         for _, ep := range subs.EpList.Endpoints {
522                                                 tmpList := xapp.RmrEndpointList{}
523                                                 tmpList.AddEndpoint(&ep)
524                                                 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
525                                                 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
526                                                         c.UpdateCounter(cRouteDeleteFail)
527                                                 }
528                                         }
529                                 }
530                                 // Delete E2 subscription from registry and db
531                                 xapp.Logger.Debug("Registry: Subscription delete. subId=%v", subId)
532                                 delete(r.register, subId)
533                                 r.subIds = append(r.subIds, subId)
534                                 c.RemoveSubscriptionFromDb(subs)
535                         }
536                 }
537         }
538
539         // Delete REST subscription from registry and db
540         for restSubId, restSubs := range r.restSubscriptions {
541                 if restSubs.Meid == ranName && restSubs.SubReqOngoing == true || restSubs.SubDelReqOngoing == true {
542                         // Subscription creation or deletion processes need to be processed gracefully till the end.
543                         // Subscription is deleted at end of the process in both cases.
544                         xapp.Logger.Debug("Registry: REST subscription under prosessing ongoing cannot delete it yet. RestSubId=%v, SubReqOngoing=%v, SubDelReqOngoing=%v", restSubId, restSubs.SubReqOngoing, restSubs.SubDelReqOngoing)
545                         continue
546                 } else {
547                         xapp.Logger.Debug("Registry: REST subscription delete. subId=%v", restSubId)
548                         delete(r.restSubscriptions, restSubId)
549                         c.RemoveRESTSubscriptionFromDb(restSubId)
550                 }
551         }
552 }