Subscription REST interface update
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "strings"
26         "sync"
27         "time"
28
29         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 )
33
34 //-----------------------------------------------------------------------------
35 //
36 //-----------------------------------------------------------------------------
37
38 type RESTSubscription struct {
39         xAppRmrEndPoint  string
40         Meid             string
41         InstanceIds      []uint32
42         xAppIdToE2Id     map[int64]int64
43         SubReqOngoing    bool
44         SubDelReqOngoing bool
45         lastReqMd5sum    string
46 }
47
48 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
49
50         for _, v := range r.InstanceIds {
51                 if v == instanceId {
52                         return
53                 }
54
55         }
56
57         r.InstanceIds = append(r.InstanceIds, instanceId)
58 }
59
60 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
61         if md5sum != "" {
62                 r.lastReqMd5sum = md5sum
63         } else {
64                 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
65         }
66 }
67
68 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
69         r.InstanceIds = r.InstanceIds[1:]
70 }
71
72 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
73         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
74 }
75
76 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
77         return r.xAppIdToE2Id[xAppEventInstanceID]
78 }
79
80 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
81         delete(r.xAppIdToE2Id, xAppEventInstanceID)
82 }
83
84 func (r *RESTSubscription) SetProcessed(err error) {
85         r.SubReqOngoing = false
86         if err != nil {
87                 r.lastReqMd5sum = ""
88         }
89 }
90
91 type Registry struct {
92         mutex             sync.Mutex
93         register          map[uint32]*Subscription
94         subIds            []uint32
95         rtmgrClient       *RtmgrClient
96         restSubscriptions map[string]*RESTSubscription
97 }
98
99 func (r *Registry) Initialize() {
100         r.register = make(map[uint32]*Subscription)
101         r.restSubscriptions = make(map[string]*RESTSubscription)
102
103         var i uint32
104         for i = 1; i < 65535; i++ {
105                 r.subIds = append(r.subIds, i)
106         }
107 }
108
109 func (r *Registry) GetAllRestSubscriptions() []byte {
110         r.mutex.Lock()
111         defer r.mutex.Unlock()
112         restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
113         if err != nil {
114                 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
115         }
116         return restSubscriptionsJson
117 }
118
119 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
120         r.mutex.Lock()
121         defer r.mutex.Unlock()
122         newRestSubscription := RESTSubscription{}
123         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
124         newRestSubscription.Meid = *maid
125         newRestSubscription.SubReqOngoing = true
126         newRestSubscription.SubDelReqOngoing = false
127         r.restSubscriptions[*restSubId] = &newRestSubscription
128         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
129         xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
130         return &newRestSubscription
131 }
132
133 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
134         r.mutex.Lock()
135         defer r.mutex.Unlock()
136         delete(r.restSubscriptions, *restSubId)
137         xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
138 }
139
140 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
141         r.mutex.Lock()
142         defer r.mutex.Unlock()
143         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
144                 // Subscription deletion is not allowed if prosessing subscription request in not ready
145                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
146                         if IsDelReqOngoing == true {
147                                 restSubscription.SubDelReqOngoing = true
148                         }
149                         r.restSubscriptions[restSubId] = restSubscription
150                         return restSubscription, nil
151                 } else {
152                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
153                 }
154         }
155         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
156 }
157
158 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
159         r.mutex.Lock()
160         defer r.mutex.Unlock()
161
162         resp := models.SubscriptionList{}
163         for _, subs := range r.register {
164                 subs.mutex.Lock()
165                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
166                 subs.mutex.Unlock()
167         }
168         return resp, nil
169 }
170
171 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
172         if len(r.subIds) > 0 {
173                 subId := r.subIds[0]
174                 r.subIds = r.subIds[1:]
175                 if _, ok := r.register[subId]; ok == true {
176                         r.subIds = append(r.subIds, subId)
177                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
178                 }
179                 subs := &Subscription{
180                         registry:         r,
181                         Meid:             trans.Meid,
182                         RMRRouteCreated:  rmrRoutecreated,
183                         SubReqMsg:        subReqMsg,
184                         valid:            true,
185                         PolicyUpdate:     false,
186                         RetryFromXapp:    false,
187                         SubRespRcvd:      false,
188                         DeleteFromDb:     false,
189                         NoRespToXapp:     false,
190                         DoNotWaitSubResp: false,
191                 }
192                 subs.ReqId.Id = subReqMsg.RequestId.Id
193                 subs.ReqId.InstanceId = subId
194                 r.SetResetTestFlag(resetTestFlag, subs)
195
196                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
197                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
198                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
199                 }
200                 return subs, nil
201         }
202         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
203 }
204
205 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
206
207         for _, subs := range r.register {
208                 if subs.IsMergeable(trans, subReqMsg) {
209
210                         //
211                         // check if there has been race conditions
212                         //
213                         subs.mutex.Lock()
214                         //subs has been set to invalid
215                         if subs.valid == false {
216                                 subs.mutex.Unlock()
217                                 continue
218                         }
219                         // If size is zero, entry is to be deleted
220                         if subs.EpList.Size() == 0 {
221                                 subs.mutex.Unlock()
222                                 continue
223                         }
224                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
225                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
226                                 subs.mutex.Unlock()
227                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
228                                 return subs, true
229                         }
230                         subs.mutex.Unlock()
231
232                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
233                         return subs, false
234                 }
235         }
236         return nil, false
237 }
238
239 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
240         var err error
241         var newAlloc bool
242         errorInfo := ErrorInfo{}
243         r.mutex.Lock()
244         defer r.mutex.Unlock()
245
246         //
247         // Check validity of subscription action types
248         //
249         actionType, err := r.CheckActionTypes(subReqMsg)
250         if err != nil {
251                 xapp.Logger.Debug("CREATE %s", err)
252                 err = fmt.Errorf("E2 content validation failed")
253                 return nil, errorInfo, err
254         }
255
256         //
257         // Find possible existing Policy subscription
258         //
259         if actionType == e2ap.E2AP_ActionTypePolicy {
260                 if subs, ok := r.register[trans.GetSubId()]; ok {
261                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
262                         // Update message data to subscription
263                         subs.SubReqMsg = subReqMsg
264                         subs.PolicyUpdate = true
265                         subs.SetCachedResponse(nil, true)
266                         r.SetResetTestFlag(resetTestFlag, subs)
267                         return subs, errorInfo, nil
268                 }
269         }
270
271         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
272         if subs == nil {
273                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
274                         xapp.Logger.Error("%s", err.Error())
275                         err = fmt.Errorf("subscription not allocated")
276                         return nil, errorInfo, err
277                 }
278                 newAlloc = true
279         } else if endPointFound == true {
280                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
281                 subs.RetryFromXapp = true
282                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
283                 c.UpdateCounter(cDuplicateE2SubReq)
284                 return subs, errorInfo, nil
285         }
286
287         //
288         // Add to subscription
289         //
290         subs.mutex.Lock()
291         defer subs.mutex.Unlock()
292
293         epamount := subs.EpList.Size()
294         xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
295
296         r.mutex.Unlock()
297         //
298         // Subscription route updates
299         //
300         if createRMRRoute == true {
301                 if epamount == 1 {
302                         errorInfo, err = r.RouteCreate(subs, c)
303                 } else {
304                         errorInfo, err = r.RouteCreateUpdate(subs, c)
305                 }
306         } else {
307                 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
308         }
309         r.mutex.Lock()
310
311         if err != nil {
312                 if newAlloc {
313                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
314                 }
315                 // Delete already added endpoint for the request
316                 subs.EpList.DelEndpoint(trans.GetEndpoint())
317                 return nil, errorInfo, err
318         }
319
320         if newAlloc {
321                 r.register[subs.ReqId.InstanceId] = subs
322         }
323         xapp.Logger.Debug("CREATE %s", subs.String())
324         xapp.Logger.Debug("Registry: substable=%v", r.register)
325         return subs, errorInfo, nil
326 }
327
328 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
329         errorInfo := ErrorInfo{}
330         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
331         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
332         if err != nil {
333                 if strings.Contains(err.Error(), "status 400") {
334                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
335                 } else {
336                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
337                 }
338                 errorInfo.ErrorCause = err.Error()
339                 c.UpdateCounter(cRouteCreateFail)
340                 xapp.Logger.Error("%s", err.Error())
341                 err = fmt.Errorf("RTMGR route create failure")
342         }
343         return errorInfo, err
344 }
345
346 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
347         errorInfo := ErrorInfo{}
348         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
349         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
350         if err != nil {
351                 if strings.Contains(err.Error(), "status 400") {
352                         errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
353                 } else {
354                         errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
355                 }
356                 errorInfo.ErrorCause = err.Error()
357                 c.UpdateCounter(cRouteCreateUpdateFail)
358                 xapp.Logger.Error("%s", err.Error())
359                 err = fmt.Errorf("RTMGR route update failure")
360                 return errorInfo, err
361         }
362         c.UpdateCounter(cMergedSubscriptions)
363         return errorInfo, err
364 }
365
366 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
367         var reportFound bool = false
368         var policyFound bool = false
369         var insertFound bool = false
370
371         for _, acts := range subReqMsg.ActionSetups {
372                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
373                         reportFound = true
374                 }
375                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
376                         policyFound = true
377                 }
378                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
379                         insertFound = true
380                 }
381         }
382         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
383                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
384         }
385         if reportFound == true {
386                 return e2ap.E2AP_ActionTypeReport, nil
387         }
388         if policyFound == true {
389                 return e2ap.E2AP_ActionTypePolicy, nil
390         }
391         if insertFound == true {
392                 return e2ap.E2AP_ActionTypeInsert, nil
393         }
394         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
395 }
396
397 // TODO: Works with concurrent calls, but check if can be improved
398 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
399
400         r.mutex.Lock()
401         defer r.mutex.Unlock()
402         subs.mutex.Lock()
403         defer subs.mutex.Unlock()
404
405         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
406         epamount := subs.EpList.Size()
407         subId := subs.ReqId.InstanceId
408
409         if delStatus == false {
410                 return nil
411         }
412
413         go func() {
414                 if waitRouteClean > 0 {
415                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
416                         time.Sleep(waitRouteClean)
417                 }
418
419                 subs.mutex.Lock()
420                 defer subs.mutex.Unlock()
421                 xapp.Logger.Debug("CLEAN %s", subs.String())
422
423                 if epamount == 0 {
424                         //
425                         // Subscription route delete
426                         //
427                         if subs.RMRRouteCreated == true {
428                                 r.RouteDelete(subs, trans, c)
429                         }
430
431                         //
432                         // Subscription release
433                         //
434                         r.mutex.Lock()
435                         defer r.mutex.Unlock()
436
437                         if _, ok := r.register[subId]; ok {
438                                 xapp.Logger.Debug("RELEASE %s", subs.String())
439                                 delete(r.register, subId)
440                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
441                         }
442                         r.subIds = append(r.subIds, subId)
443                 } else if subs.EpList.Size() > 0 {
444                         //
445                         // Subscription route update
446                         //
447                         if subs.RMRRouteCreated == true {
448                                 r.RouteDeleteUpdate(subs, c)
449                         }
450                 }
451         }()
452
453         return nil
454 }
455
456 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
457         tmpList := xapp.RmrEndpointList{}
458         tmpList.AddEndpoint(trans.GetEndpoint())
459         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
460         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
461                 c.UpdateCounter(cRouteDeleteFail)
462         }
463 }
464
465 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
466         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
467         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
468                 c.UpdateCounter(cRouteDeleteUpdateFail)
469         }
470 }
471
472 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
473         r.mutex.Lock()
474         defer r.mutex.Unlock()
475         subs.mutex.Lock()
476         defer subs.mutex.Unlock()
477
478         epamount := subs.EpList.Size()
479         if epamount == 0 {
480                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
481                         // Not merged subscription is being deleted
482                         c.RemoveSubscriptionFromDb(subs)
483
484                 }
485         } else if subs.EpList.Size() > 0 {
486                 // Endpoint of merged subscription is being deleted
487                 c.WriteSubscriptionToDb(subs)
488                 c.UpdateCounter(cUnmergedSubscriptions)
489         }
490 }
491
492 func (r *Registry) GetSubscription(subId uint32) *Subscription {
493         r.mutex.Lock()
494         defer r.mutex.Unlock()
495         if _, ok := r.register[subId]; ok {
496                 return r.register[subId]
497         }
498         return nil
499 }
500
501 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
502         r.mutex.Lock()
503         defer r.mutex.Unlock()
504         for _, subId := range subIds {
505                 if _, ok := r.register[subId]; ok {
506                         return r.register[subId], nil
507                 }
508         }
509         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
510 }
511
512 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
513         if resetTestFlag == true {
514                 // This is used in submgr restart unit tests
515                 xapp.Logger.Debug("resetTestFlag == true")
516                 subs.DoNotWaitSubResp = true
517         } else {
518                 xapp.Logger.Debug("resetTestFlag == false")
519         }
520 }