72a210056efaf6f97d04f9cc779261ffad13ba12
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "sync"
26         "time"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 )
32
33 //-----------------------------------------------------------------------------
34 //
35 //-----------------------------------------------------------------------------
36
37 type RESTSubscription struct {
38         xAppRmrEndPoint  string
39         Meid             string
40         InstanceIds      []uint32
41         xAppIdToE2Id     map[int64]int64
42         SubReqOngoing    bool
43         SubDelReqOngoing bool
44         lastReqMd5sum    string
45 }
46
47 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
48
49         for _, v := range r.InstanceIds {
50                 if v == instanceId {
51                         return
52                 }
53
54         }
55
56         r.InstanceIds = append(r.InstanceIds, instanceId)
57 }
58
59 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
60         if md5sum != "" {
61                 r.lastReqMd5sum = md5sum
62         } else {
63                 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
64         }
65 }
66
67 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
68         r.InstanceIds = r.InstanceIds[1:]
69 }
70
71 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
72         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
73 }
74
75 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
76         return r.xAppIdToE2Id[xAppEventInstanceID]
77 }
78
79 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
80         delete(r.xAppIdToE2Id, xAppEventInstanceID)
81 }
82
83 func (r *RESTSubscription) SetProcessed(err error) {
84         r.SubReqOngoing = false
85         if err != nil {
86                 r.lastReqMd5sum = ""
87         }
88 }
89
90 type Registry struct {
91         mutex             sync.Mutex
92         register          map[uint32]*Subscription
93         subIds            []uint32
94         rtmgrClient       *RtmgrClient
95         restSubscriptions map[string]*RESTSubscription
96 }
97
98 func (r *Registry) Initialize() {
99         r.register = make(map[uint32]*Subscription)
100         r.restSubscriptions = make(map[string]*RESTSubscription)
101
102         var i uint32
103         for i = 1; i < 65535; i++ {
104                 r.subIds = append(r.subIds, i)
105         }
106 }
107
108 func (r *Registry) GetAllRestSubscriptions() []byte {
109         r.mutex.Lock()
110         defer r.mutex.Unlock()
111         restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
112         if err != nil {
113                 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
114         }
115         return restSubscriptionsJson
116 }
117
118 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
119         r.mutex.Lock()
120         defer r.mutex.Unlock()
121         newRestSubscription := RESTSubscription{}
122         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
123         newRestSubscription.Meid = *maid
124         newRestSubscription.SubReqOngoing = true
125         newRestSubscription.SubDelReqOngoing = false
126         r.restSubscriptions[*restSubId] = &newRestSubscription
127         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
128         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
129         return &newRestSubscription, nil
130 }
131
132 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
133         r.mutex.Lock()
134         defer r.mutex.Unlock()
135         delete(r.restSubscriptions, *restSubId)
136         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
137 }
138
139 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
140         r.mutex.Lock()
141         defer r.mutex.Unlock()
142         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
143                 // Subscription deletion is not allowed if prosessing subscription request in not ready
144                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
145                         if IsDelReqOngoing == true {
146                                 restSubscription.SubDelReqOngoing = true
147                         }
148                         r.restSubscriptions[restSubId] = restSubscription
149                         return restSubscription, nil
150                 } else {
151                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
152                 }
153                 return restSubscription, nil
154         }
155         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
156 }
157
158 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
159         r.mutex.Lock()
160         defer r.mutex.Unlock()
161
162         resp := models.SubscriptionList{}
163         for _, subs := range r.register {
164                 subs.mutex.Lock()
165                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
166                 subs.mutex.Unlock()
167         }
168         return resp, nil
169 }
170
171 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
172         if len(r.subIds) > 0 {
173                 subId := r.subIds[0]
174                 r.subIds = r.subIds[1:]
175                 if _, ok := r.register[subId]; ok == true {
176                         r.subIds = append(r.subIds, subId)
177                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
178                 }
179                 subs := &Subscription{
180                         registry:         r,
181                         Meid:             trans.Meid,
182                         SubReqMsg:        subReqMsg,
183                         valid:            true,
184                         RetryFromXapp:    false,
185                         SubRespRcvd:      false,
186                         DeleteFromDb:     false,
187                         NoRespToXapp:     false,
188                         DoNotWaitSubResp: false,
189                 }
190                 subs.ReqId.Id = subReqMsg.RequestId.Id
191                 subs.ReqId.InstanceId = subId
192                 if resetTestFlag == true {
193                         subs.DoNotWaitSubResp = true
194                 }
195
196                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
197                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
198                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
199                 }
200                 return subs, nil
201         }
202         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
203 }
204
205 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
206
207         for _, subs := range r.register {
208                 if subs.IsMergeable(trans, subReqMsg) {
209
210                         //
211                         // check if there has been race conditions
212                         //
213                         subs.mutex.Lock()
214                         //subs has been set to invalid
215                         if subs.valid == false {
216                                 subs.mutex.Unlock()
217                                 continue
218                         }
219                         // If size is zero, entry is to be deleted
220                         if subs.EpList.Size() == 0 {
221                                 subs.mutex.Unlock()
222                                 continue
223                         }
224                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
225                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
226                                 subs.mutex.Unlock()
227                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
228                                 return subs, true
229                         }
230                         subs.mutex.Unlock()
231
232                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
233                         return subs, false
234                 }
235         }
236         return nil, false
237 }
238
239 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
240         var err error
241         var newAlloc bool
242         r.mutex.Lock()
243         defer r.mutex.Unlock()
244
245         //
246         // Check validity of subscription action types
247         //
248         actionType, err := r.CheckActionTypes(subReqMsg)
249         if err != nil {
250                 xapp.Logger.Info("CREATE %s", err)
251                 err = fmt.Errorf("E2 content validation failed")
252                 return nil, err
253         }
254
255         //
256         // Find possible existing Policy subscription
257         //
258         if actionType == e2ap.E2AP_ActionTypePolicy {
259                 if subs, ok := r.register[trans.GetSubId()]; ok {
260                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
261                         // Update message data to subscription
262                         subs.SubReqMsg = subReqMsg
263                         subs.SetCachedResponse(nil, true)
264                         return subs, nil
265                 }
266         }
267
268         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
269         if subs == nil {
270                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
271                         xapp.Logger.Error("%s", err.Error())
272                         err = fmt.Errorf("subscription not allocated")
273                         return nil, err
274                 }
275                 newAlloc = true
276         } else if endPointFound == true {
277                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
278                 subs.RetryFromXapp = true
279                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
280                 c.UpdateCounter(cDuplicateE2SubReq)
281                 return subs, nil
282         }
283
284         //
285         // Add to subscription
286         //
287         subs.mutex.Lock()
288         defer subs.mutex.Unlock()
289
290         epamount := subs.EpList.Size()
291         xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
292
293         r.mutex.Unlock()
294         //
295         // Subscription route updates
296         //
297         if epamount == 1 {
298                 err = r.RouteCreate(subs, c)
299         } else {
300                 err = r.RouteCreateUpdate(subs, c)
301         }
302         r.mutex.Lock()
303
304         if err != nil {
305                 if newAlloc {
306                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
307                 }
308                 // Delete already added endpoint for the request
309                 subs.EpList.DelEndpoint(trans.GetEndpoint())
310                 return nil, err
311         }
312
313         if newAlloc {
314                 r.register[subs.ReqId.InstanceId] = subs
315         }
316         xapp.Logger.Debug("CREATE %s", subs.String())
317         xapp.Logger.Debug("Registry: substable=%v", r.register)
318         return subs, nil
319 }
320
321 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
322         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
323         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
324         if err != nil {
325                 c.UpdateCounter(cRouteCreateFail)
326                 xapp.Logger.Error("%s", err.Error())
327                 err = fmt.Errorf("RTMGR route create failure")
328         }
329         return err
330 }
331
332 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
333         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
334         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
335         if err != nil {
336                 c.UpdateCounter(cRouteCreateUpdateFail)
337                 xapp.Logger.Error("%s", err.Error())
338                 err = fmt.Errorf("RTMGR route update failure")
339                 return err
340         }
341         c.UpdateCounter(cMergedSubscriptions)
342         return err
343 }
344
345 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
346         var reportFound bool = false
347         var policyFound bool = false
348         var insertFound bool = false
349
350         for _, acts := range subReqMsg.ActionSetups {
351                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
352                         reportFound = true
353                 }
354                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
355                         policyFound = true
356                 }
357                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
358                         insertFound = true
359                 }
360         }
361         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
362                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
363         }
364         if reportFound == true {
365                 return e2ap.E2AP_ActionTypeReport, nil
366         }
367         if policyFound == true {
368                 return e2ap.E2AP_ActionTypePolicy, nil
369         }
370         if insertFound == true {
371                 return e2ap.E2AP_ActionTypeInsert, nil
372         }
373         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
374 }
375
376 // TODO: Works with concurrent calls, but check if can be improved
377 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
378
379         r.mutex.Lock()
380         defer r.mutex.Unlock()
381         subs.mutex.Lock()
382         defer subs.mutex.Unlock()
383
384         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
385         epamount := subs.EpList.Size()
386         subId := subs.ReqId.InstanceId
387
388         if delStatus == false {
389                 return nil
390         }
391
392         go func() {
393                 if waitRouteClean > 0 {
394                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
395                         time.Sleep(waitRouteClean)
396                 }
397
398                 subs.mutex.Lock()
399                 defer subs.mutex.Unlock()
400                 xapp.Logger.Info("CLEAN %s", subs.String())
401
402                 if epamount == 0 {
403                         //
404                         // Subscription route delete
405                         //
406                         r.RouteDelete(subs, trans, c)
407
408                         //
409                         // Subscription release
410                         //
411                         r.mutex.Lock()
412                         defer r.mutex.Unlock()
413
414                         if _, ok := r.register[subId]; ok {
415                                 xapp.Logger.Debug("RELEASE %s", subs.String())
416                                 delete(r.register, subId)
417                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
418                         }
419                         r.subIds = append(r.subIds, subId)
420                 } else if subs.EpList.Size() > 0 {
421                         //
422                         // Subscription route update
423                         //
424                         r.RouteDeleteUpdate(subs, c)
425                 }
426         }()
427
428         return nil
429 }
430
431 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
432         tmpList := xapp.RmrEndpointList{}
433         tmpList.AddEndpoint(trans.GetEndpoint())
434         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
435         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
436                 c.UpdateCounter(cRouteDeleteFail)
437         }
438 }
439
440 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
441         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
442         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
443                 c.UpdateCounter(cRouteDeleteUpdateFail)
444         }
445 }
446
447 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
448         r.mutex.Lock()
449         defer r.mutex.Unlock()
450         subs.mutex.Lock()
451         defer subs.mutex.Unlock()
452
453         epamount := subs.EpList.Size()
454         if epamount == 0 {
455                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
456                         // Not merged subscription is being deleted
457                         c.RemoveSubscriptionFromDb(subs)
458
459                 }
460         } else if subs.EpList.Size() > 0 {
461                 // Endpoint of merged subscription is being deleted
462                 c.WriteSubscriptionToDb(subs)
463                 c.UpdateCounter(cUnmergedSubscriptions)
464         }
465 }
466
467 func (r *Registry) GetSubscription(subId uint32) *Subscription {
468         r.mutex.Lock()
469         defer r.mutex.Unlock()
470         if _, ok := r.register[subId]; ok {
471                 return r.register[subId]
472         }
473         return nil
474 }
475
476 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
477         r.mutex.Lock()
478         defer r.mutex.Unlock()
479         for _, subId := range subIds {
480                 if _, ok := r.register[subId]; ok {
481                         return r.register[subId], nil
482                 }
483         }
484         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
485 }