671cf5197a832d6cebfd7475a6ef8bec6d8c9912
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "sync"
25         "time"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type RESTSubscription struct {
37         xAppRmrEndPoint  string
38         Meid             string
39         InstanceIds      []uint32
40         xAppIdToE2Id     map[int64]int64
41         SubReqOngoing    bool
42         SubDelReqOngoing bool
43         Md5sum           string
44 }
45
46 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
47         r.InstanceIds = append(r.InstanceIds, instanceId)
48 }
49
50 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
51         r.InstanceIds = r.InstanceIds[1:]
52 }
53
54 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
55         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
56 }
57
58 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
59         return r.xAppIdToE2Id[xAppEventInstanceID]
60 }
61
62 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
63         delete(r.xAppIdToE2Id, xAppEventInstanceID)
64 }
65
66 func (r *RESTSubscription) SetProcessed() {
67         r.SubReqOngoing = false
68         r.Md5sum = ""
69 }
70
71 type Registry struct {
72         mutex             sync.Mutex
73         register          map[uint32]*Subscription
74         subIds            []uint32
75         rtmgrClient       *RtmgrClient
76         restSubscriptions map[string]*RESTSubscription
77 }
78
79 func (r *Registry) Initialize() {
80         r.register = make(map[uint32]*Subscription)
81         r.restSubscriptions = make(map[string]*RESTSubscription)
82
83         var i uint32
84         for i = 1; i < 65535; i++ {
85                 r.subIds = append(r.subIds, i)
86         }
87 }
88
89 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
90         r.mutex.Lock()
91         defer r.mutex.Unlock()
92         newRestSubscription := RESTSubscription{}
93         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
94         newRestSubscription.Meid = *maid
95         newRestSubscription.SubReqOngoing = true
96         newRestSubscription.SubDelReqOngoing = false
97         r.restSubscriptions[*restSubId] = &newRestSubscription
98         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
99         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
100         return &newRestSubscription, nil
101 }
102
103 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
104         r.mutex.Lock()
105         defer r.mutex.Unlock()
106         delete(r.restSubscriptions, *restSubId)
107         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
108 }
109
110 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
111         r.mutex.Lock()
112         defer r.mutex.Unlock()
113         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
114                 // Subscription deletion is not allowed if prosessing subscription request in not ready
115                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
116                         if IsDelReqOngoing == true {
117                                 restSubscription.SubDelReqOngoing = true
118                         }
119                         r.restSubscriptions[restSubId] = restSubscription
120                         return restSubscription, nil
121                 } else {
122                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
123                 }
124                 return restSubscription, nil
125         }
126         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
127 }
128
129 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
130         r.mutex.Lock()
131         defer r.mutex.Unlock()
132
133         resp := models.SubscriptionList{}
134         for _, subs := range r.register {
135                 subs.mutex.Lock()
136                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
137                 subs.mutex.Unlock()
138         }
139         return resp, nil
140 }
141
142 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
143         if len(r.subIds) > 0 {
144                 subId := r.subIds[0]
145                 r.subIds = r.subIds[1:]
146                 if _, ok := r.register[subId]; ok == true {
147                         r.subIds = append(r.subIds, subId)
148                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
149                 }
150                 subs := &Subscription{
151                         registry:         r,
152                         Meid:             trans.Meid,
153                         SubReqMsg:        subReqMsg,
154                         valid:            true,
155                         RetryFromXapp:    false,
156                         SubRespRcvd:      false,
157                         DeleteFromDb:     false,
158                         NoRespToXapp:     false,
159                         DoNotWaitSubResp: false,
160                 }
161                 subs.ReqId.Id = subReqMsg.RequestId.Id
162                 subs.ReqId.InstanceId = subId
163                 if resetTestFlag == true {
164                         subs.DoNotWaitSubResp = true
165                 }
166
167                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
168                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
169                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
170                 }
171                 return subs, nil
172         }
173         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
174 }
175
176 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
177
178         for _, subs := range r.register {
179                 if subs.IsMergeable(trans, subReqMsg) {
180
181                         //
182                         // check if there has been race conditions
183                         //
184                         subs.mutex.Lock()
185                         //subs has been set to invalid
186                         if subs.valid == false {
187                                 subs.mutex.Unlock()
188                                 continue
189                         }
190                         // If size is zero, entry is to be deleted
191                         if subs.EpList.Size() == 0 {
192                                 subs.mutex.Unlock()
193                                 continue
194                         }
195                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
196                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
197                                 subs.mutex.Unlock()
198                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
199                                 return subs, true
200                         }
201                         subs.mutex.Unlock()
202
203                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
204                         return subs, false
205                 }
206         }
207         return nil, false
208 }
209
210 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
211         var err error
212         var newAlloc bool
213         r.mutex.Lock()
214         defer r.mutex.Unlock()
215
216         //
217         // Check validity of subscription action types
218         //
219         actionType, err := r.CheckActionTypes(subReqMsg)
220         if err != nil {
221                 xapp.Logger.Info("CREATE %s", err)
222                 err = fmt.Errorf("E2 content validation failed")
223                 return nil, err
224         }
225
226         //
227         // Find possible existing Policy subscription
228         //
229         if actionType == e2ap.E2AP_ActionTypePolicy {
230                 if subs, ok := r.register[trans.GetSubId()]; ok {
231                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
232                         // Update message data to subscription
233                         subs.SubReqMsg = subReqMsg
234                         subs.SetCachedResponse(nil, true)
235                         return subs, nil
236                 }
237         }
238
239         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
240         if subs == nil {
241                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
242                         xapp.Logger.Error("%s", err.Error())
243                         err = fmt.Errorf("subscription not allocated")
244                         return nil, err
245                 }
246                 newAlloc = true
247         } else if endPointFound == true {
248                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
249                 subs.RetryFromXapp = true
250                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
251                 c.UpdateCounter(cDuplicateE2SubReq)
252                 return subs, nil
253         }
254
255         //
256         // Add to subscription
257         //
258         subs.mutex.Lock()
259         defer subs.mutex.Unlock()
260
261         epamount := subs.EpList.Size()
262         xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
263
264         r.mutex.Unlock()
265         //
266         // Subscription route updates
267         //
268         if epamount == 1 {
269                 err = r.RouteCreate(subs, c)
270         } else {
271                 err = r.RouteCreateUpdate(subs, c)
272         }
273         r.mutex.Lock()
274
275         if err != nil {
276                 if newAlloc {
277                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
278                 }
279                 // Delete already added endpoint for the request
280                 subs.EpList.DelEndpoint(trans.GetEndpoint())
281                 return nil, err
282         }
283
284         if newAlloc {
285                 r.register[subs.ReqId.InstanceId] = subs
286         }
287         xapp.Logger.Debug("CREATE %s", subs.String())
288         xapp.Logger.Debug("Registry: substable=%v", r.register)
289         return subs, nil
290 }
291
292 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
293         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
294         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
295         if err != nil {
296                 c.UpdateCounter(cRouteCreateFail)
297                 xapp.Logger.Error("%s", err.Error())
298                 err = fmt.Errorf("RTMGR route create failure")
299         }
300         return err
301 }
302
303 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
304         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
305         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
306         if err != nil {
307                 c.UpdateCounter(cRouteCreateUpdateFail)
308                 xapp.Logger.Error("%s", err.Error())
309                 err = fmt.Errorf("RTMGR route update failure")
310                 return err
311         }
312         c.UpdateCounter(cMergedSubscriptions)
313         return err
314 }
315
316 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
317         var reportFound bool = false
318         var policyFound bool = false
319         var insertFound bool = false
320
321         for _, acts := range subReqMsg.ActionSetups {
322                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
323                         reportFound = true
324                 }
325                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
326                         policyFound = true
327                 }
328                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
329                         insertFound = true
330                 }
331         }
332         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
333                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
334         }
335         if reportFound == true {
336                 return e2ap.E2AP_ActionTypeReport, nil
337         }
338         if policyFound == true {
339                 return e2ap.E2AP_ActionTypePolicy, nil
340         }
341         if insertFound == true {
342                 return e2ap.E2AP_ActionTypeInsert, nil
343         }
344         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
345 }
346
347 // TODO: Works with concurrent calls, but check if can be improved
348 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
349
350         r.mutex.Lock()
351         defer r.mutex.Unlock()
352         subs.mutex.Lock()
353         defer subs.mutex.Unlock()
354
355         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
356         epamount := subs.EpList.Size()
357         subId := subs.ReqId.InstanceId
358
359         if delStatus == false {
360                 return nil
361         }
362
363         go func() {
364                 if waitRouteClean > 0 {
365                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
366                         time.Sleep(waitRouteClean)
367                 }
368
369                 subs.mutex.Lock()
370                 defer subs.mutex.Unlock()
371                 xapp.Logger.Info("CLEAN %s", subs.String())
372
373                 if epamount == 0 {
374                         //
375                         // Subscription route delete
376                         //
377                         r.RouteDelete(subs, trans, c)
378
379                         //
380                         // Subscription release
381                         //
382                         r.mutex.Lock()
383                         defer r.mutex.Unlock()
384
385                         if _, ok := r.register[subId]; ok {
386                                 xapp.Logger.Debug("RELEASE %s", subs.String())
387                                 delete(r.register, subId)
388                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
389                         }
390                         r.subIds = append(r.subIds, subId)
391                 } else if subs.EpList.Size() > 0 {
392                         //
393                         // Subscription route update
394                         //
395                         r.RouteDeleteUpdate(subs, c)
396                 }
397         }()
398
399         return nil
400 }
401
402 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
403         tmpList := xapp.RmrEndpointList{}
404         tmpList.AddEndpoint(trans.GetEndpoint())
405         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
406         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
407                 c.UpdateCounter(cRouteDeleteFail)
408         }
409 }
410
411 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
412         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
413         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
414                 c.UpdateCounter(cRouteDeleteUpdateFail)
415         }
416 }
417
418 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
419         r.mutex.Lock()
420         defer r.mutex.Unlock()
421         subs.mutex.Lock()
422         defer subs.mutex.Unlock()
423
424         epamount := subs.EpList.Size()
425         if epamount == 0 {
426                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
427                         // Not merged subscription is being deleted
428                         c.RemoveSubscriptionFromDb(subs)
429
430                 }
431         } else if subs.EpList.Size() > 0 {
432                 // Endpoint of merged subscription is being deleted
433                 c.WriteSubscriptionToDb(subs)
434                 c.UpdateCounter(cUnmergedSubscriptions)
435         }
436 }
437
438 func (r *Registry) GetSubscription(subId uint32) *Subscription {
439         r.mutex.Lock()
440         defer r.mutex.Unlock()
441         if _, ok := r.register[subId]; ok {
442                 return r.register[subId]
443         }
444         return nil
445 }
446
447 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
448         r.mutex.Lock()
449         defer r.mutex.Unlock()
450         for _, subId := range subIds {
451                 if _, ok := r.register[subId]; ok {
452                         return r.register[subId], nil
453                 }
454         }
455         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
456 }