Added duplicate detection changes
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "sync"
25         "time"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type RESTSubscription struct {
37         xAppRmrEndPoint  string
38         Meid             string
39         InstanceIds      []uint32
40         xAppIdToE2Id     map[int64]int64
41         SubReqOngoing    bool
42         SubDelReqOngoing bool
43         lastReqMd5sum    string
44 }
45
46 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
47
48         for _, v := range r.InstanceIds {
49                 if v == instanceId {
50                         return
51                 }
52
53         }
54
55         r.InstanceIds = append(r.InstanceIds, instanceId)
56 }
57
58 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
59         if md5sum != "" {
60                 r.lastReqMd5sum = md5sum
61         } else {
62                 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
63         }
64 }
65
66 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
67         r.InstanceIds = r.InstanceIds[1:]
68 }
69
70 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
71         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
72 }
73
74 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
75         return r.xAppIdToE2Id[xAppEventInstanceID]
76 }
77
78 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
79         delete(r.xAppIdToE2Id, xAppEventInstanceID)
80 }
81
82 func (r *RESTSubscription) SetProcessed(err error) {
83         r.SubReqOngoing = false
84         if err != nil {
85                 r.lastReqMd5sum = ""
86         }
87 }
88
89 type Registry struct {
90         mutex             sync.Mutex
91         register          map[uint32]*Subscription
92         subIds            []uint32
93         rtmgrClient       *RtmgrClient
94         restSubscriptions map[string]*RESTSubscription
95 }
96
97 func (r *Registry) Initialize() {
98         r.register = make(map[uint32]*Subscription)
99         r.restSubscriptions = make(map[string]*RESTSubscription)
100
101         var i uint32
102         for i = 1; i < 65535; i++ {
103                 r.subIds = append(r.subIds, i)
104         }
105 }
106
107 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
108         r.mutex.Lock()
109         defer r.mutex.Unlock()
110         newRestSubscription := RESTSubscription{}
111         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
112         newRestSubscription.Meid = *maid
113         newRestSubscription.SubReqOngoing = true
114         newRestSubscription.SubDelReqOngoing = false
115         r.restSubscriptions[*restSubId] = &newRestSubscription
116         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
117         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
118         return &newRestSubscription, nil
119 }
120
121 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
122         r.mutex.Lock()
123         defer r.mutex.Unlock()
124         delete(r.restSubscriptions, *restSubId)
125         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
126 }
127
128 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
129         r.mutex.Lock()
130         defer r.mutex.Unlock()
131         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
132                 // Subscription deletion is not allowed if prosessing subscription request in not ready
133                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
134                         if IsDelReqOngoing == true {
135                                 restSubscription.SubDelReqOngoing = true
136                         }
137                         r.restSubscriptions[restSubId] = restSubscription
138                         return restSubscription, nil
139                 } else {
140                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
141                 }
142                 return restSubscription, nil
143         }
144         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
145 }
146
147 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
148         r.mutex.Lock()
149         defer r.mutex.Unlock()
150
151         resp := models.SubscriptionList{}
152         for _, subs := range r.register {
153                 subs.mutex.Lock()
154                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
155                 subs.mutex.Unlock()
156         }
157         return resp, nil
158 }
159
160 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
161         if len(r.subIds) > 0 {
162                 subId := r.subIds[0]
163                 r.subIds = r.subIds[1:]
164                 if _, ok := r.register[subId]; ok == true {
165                         r.subIds = append(r.subIds, subId)
166                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
167                 }
168                 subs := &Subscription{
169                         registry:         r,
170                         Meid:             trans.Meid,
171                         SubReqMsg:        subReqMsg,
172                         valid:            true,
173                         RetryFromXapp:    false,
174                         SubRespRcvd:      false,
175                         DeleteFromDb:     false,
176                         NoRespToXapp:     false,
177                         DoNotWaitSubResp: false,
178                 }
179                 subs.ReqId.Id = subReqMsg.RequestId.Id
180                 subs.ReqId.InstanceId = subId
181                 if resetTestFlag == true {
182                         subs.DoNotWaitSubResp = true
183                 }
184
185                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
186                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
187                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
188                 }
189                 return subs, nil
190         }
191         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
192 }
193
194 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
195
196         for _, subs := range r.register {
197                 if subs.IsMergeable(trans, subReqMsg) {
198
199                         //
200                         // check if there has been race conditions
201                         //
202                         subs.mutex.Lock()
203                         //subs has been set to invalid
204                         if subs.valid == false {
205                                 subs.mutex.Unlock()
206                                 continue
207                         }
208                         // If size is zero, entry is to be deleted
209                         if subs.EpList.Size() == 0 {
210                                 subs.mutex.Unlock()
211                                 continue
212                         }
213                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
214                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
215                                 subs.mutex.Unlock()
216                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
217                                 return subs, true
218                         }
219                         subs.mutex.Unlock()
220
221                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
222                         return subs, false
223                 }
224         }
225         return nil, false
226 }
227
228 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
229         var err error
230         var newAlloc bool
231         r.mutex.Lock()
232         defer r.mutex.Unlock()
233
234         //
235         // Check validity of subscription action types
236         //
237         actionType, err := r.CheckActionTypes(subReqMsg)
238         if err != nil {
239                 xapp.Logger.Info("CREATE %s", err)
240                 err = fmt.Errorf("E2 content validation failed")
241                 return nil, err
242         }
243
244         //
245         // Find possible existing Policy subscription
246         //
247         if actionType == e2ap.E2AP_ActionTypePolicy {
248                 if subs, ok := r.register[trans.GetSubId()]; ok {
249                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
250                         // Update message data to subscription
251                         subs.SubReqMsg = subReqMsg
252                         subs.SetCachedResponse(nil, true)
253                         return subs, nil
254                 }
255         }
256
257         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
258         if subs == nil {
259                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
260                         xapp.Logger.Error("%s", err.Error())
261                         err = fmt.Errorf("subscription not allocated")
262                         return nil, err
263                 }
264                 newAlloc = true
265         } else if endPointFound == true {
266                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
267                 subs.RetryFromXapp = true
268                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
269                 c.UpdateCounter(cDuplicateE2SubReq)
270                 return subs, nil
271         }
272
273         //
274         // Add to subscription
275         //
276         subs.mutex.Lock()
277         defer subs.mutex.Unlock()
278
279         epamount := subs.EpList.Size()
280         xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
281
282         r.mutex.Unlock()
283         //
284         // Subscription route updates
285         //
286         if epamount == 1 {
287                 err = r.RouteCreate(subs, c)
288         } else {
289                 err = r.RouteCreateUpdate(subs, c)
290         }
291         r.mutex.Lock()
292
293         if err != nil {
294                 if newAlloc {
295                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
296                 }
297                 // Delete already added endpoint for the request
298                 subs.EpList.DelEndpoint(trans.GetEndpoint())
299                 return nil, err
300         }
301
302         if newAlloc {
303                 r.register[subs.ReqId.InstanceId] = subs
304         }
305         xapp.Logger.Debug("CREATE %s", subs.String())
306         xapp.Logger.Debug("Registry: substable=%v", r.register)
307         return subs, nil
308 }
309
310 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
311         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
312         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
313         if err != nil {
314                 c.UpdateCounter(cRouteCreateFail)
315                 xapp.Logger.Error("%s", err.Error())
316                 err = fmt.Errorf("RTMGR route create failure")
317         }
318         return err
319 }
320
321 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
322         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
323         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
324         if err != nil {
325                 c.UpdateCounter(cRouteCreateUpdateFail)
326                 xapp.Logger.Error("%s", err.Error())
327                 err = fmt.Errorf("RTMGR route update failure")
328                 return err
329         }
330         c.UpdateCounter(cMergedSubscriptions)
331         return err
332 }
333
334 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
335         var reportFound bool = false
336         var policyFound bool = false
337         var insertFound bool = false
338
339         for _, acts := range subReqMsg.ActionSetups {
340                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
341                         reportFound = true
342                 }
343                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
344                         policyFound = true
345                 }
346                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
347                         insertFound = true
348                 }
349         }
350         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
351                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
352         }
353         if reportFound == true {
354                 return e2ap.E2AP_ActionTypeReport, nil
355         }
356         if policyFound == true {
357                 return e2ap.E2AP_ActionTypePolicy, nil
358         }
359         if insertFound == true {
360                 return e2ap.E2AP_ActionTypeInsert, nil
361         }
362         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
363 }
364
365 // TODO: Works with concurrent calls, but check if can be improved
366 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
367
368         r.mutex.Lock()
369         defer r.mutex.Unlock()
370         subs.mutex.Lock()
371         defer subs.mutex.Unlock()
372
373         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
374         epamount := subs.EpList.Size()
375         subId := subs.ReqId.InstanceId
376
377         if delStatus == false {
378                 return nil
379         }
380
381         go func() {
382                 if waitRouteClean > 0 {
383                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
384                         time.Sleep(waitRouteClean)
385                 }
386
387                 subs.mutex.Lock()
388                 defer subs.mutex.Unlock()
389                 xapp.Logger.Info("CLEAN %s", subs.String())
390
391                 if epamount == 0 {
392                         //
393                         // Subscription route delete
394                         //
395                         r.RouteDelete(subs, trans, c)
396
397                         //
398                         // Subscription release
399                         //
400                         r.mutex.Lock()
401                         defer r.mutex.Unlock()
402
403                         if _, ok := r.register[subId]; ok {
404                                 xapp.Logger.Debug("RELEASE %s", subs.String())
405                                 delete(r.register, subId)
406                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
407                         }
408                         r.subIds = append(r.subIds, subId)
409                 } else if subs.EpList.Size() > 0 {
410                         //
411                         // Subscription route update
412                         //
413                         r.RouteDeleteUpdate(subs, c)
414                 }
415         }()
416
417         return nil
418 }
419
420 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
421         tmpList := xapp.RmrEndpointList{}
422         tmpList.AddEndpoint(trans.GetEndpoint())
423         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
424         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
425                 c.UpdateCounter(cRouteDeleteFail)
426         }
427 }
428
429 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
430         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
431         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
432                 c.UpdateCounter(cRouteDeleteUpdateFail)
433         }
434 }
435
436 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
437         r.mutex.Lock()
438         defer r.mutex.Unlock()
439         subs.mutex.Lock()
440         defer subs.mutex.Unlock()
441
442         epamount := subs.EpList.Size()
443         if epamount == 0 {
444                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
445                         // Not merged subscription is being deleted
446                         c.RemoveSubscriptionFromDb(subs)
447
448                 }
449         } else if subs.EpList.Size() > 0 {
450                 // Endpoint of merged subscription is being deleted
451                 c.WriteSubscriptionToDb(subs)
452                 c.UpdateCounter(cUnmergedSubscriptions)
453         }
454 }
455
456 func (r *Registry) GetSubscription(subId uint32) *Subscription {
457         r.mutex.Lock()
458         defer r.mutex.Unlock()
459         if _, ok := r.register[subId]; ok {
460                 return r.register[subId]
461         }
462         return nil
463 }
464
465 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
466         r.mutex.Lock()
467         defer r.mutex.Unlock()
468         for _, subId := range subIds {
469                 if _, ok := r.register[subId]; ok {
470                         return r.register[subId], nil
471                 }
472         }
473         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
474 }