416b4156116633bfcf6938220ecd3b2ad81e4595
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "sync"
25         "time"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type RESTSubscription struct {
37         xAppRmrEndPoint  string
38         Meid             string
39         InstanceIds      []uint32
40         xAppIdToE2Id     map[int64]int64
41         SubReqOngoing    bool
42         SubDelReqOngoing bool
43 }
44
45 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
46         r.InstanceIds = append(r.InstanceIds, instanceId)
47 }
48
49 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
50         r.InstanceIds = r.InstanceIds[1:]
51 }
52
53 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
54         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
55 }
56
57 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
58         return r.xAppIdToE2Id[xAppEventInstanceID]
59 }
60
61 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
62         delete(r.xAppIdToE2Id, xAppEventInstanceID)
63 }
64
65 func (r *RESTSubscription) SetProcessed() {
66         r.SubReqOngoing = false
67 }
68
69 type Registry struct {
70         mutex             sync.Mutex
71         register          map[uint32]*Subscription
72         subIds            []uint32
73         rtmgrClient       *RtmgrClient
74         restSubscriptions map[string]*RESTSubscription
75 }
76
77 func (r *Registry) Initialize() {
78         r.register = make(map[uint32]*Subscription)
79         r.restSubscriptions = make(map[string]*RESTSubscription)
80
81         var i uint32
82         for i = 1; i < 65535; i++ {
83                 r.subIds = append(r.subIds, i)
84         }
85 }
86
87 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
88         r.mutex.Lock()
89         defer r.mutex.Unlock()
90         newRestSubscription := RESTSubscription{}
91         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
92         newRestSubscription.Meid = *maid
93         newRestSubscription.SubReqOngoing = true
94         newRestSubscription.SubDelReqOngoing = false
95         r.restSubscriptions[*restSubId] = &newRestSubscription
96         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
97         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
98         return &newRestSubscription, nil
99 }
100
101 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
102         r.mutex.Lock()
103         defer r.mutex.Unlock()
104         delete(r.restSubscriptions, *restSubId)
105         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
106 }
107
108 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
109         r.mutex.Lock()
110         defer r.mutex.Unlock()
111         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
112                 // Subscription deletion is not allowed if prosessing subscription request in not ready
113                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
114                         if IsDelReqOngoing == true {
115                                 restSubscription.SubDelReqOngoing = true
116                         }
117                         r.restSubscriptions[restSubId] = restSubscription
118                         return restSubscription, nil
119                 } else {
120                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
121                 }
122                 return restSubscription, nil
123         }
124         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
125 }
126
127 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
128         r.mutex.Lock()
129         defer r.mutex.Unlock()
130
131         resp := models.SubscriptionList{}
132         for _, subs := range r.register {
133                 subs.mutex.Lock()
134                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
135                 subs.mutex.Unlock()
136         }
137         return resp, nil
138 }
139
140 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
141         if len(r.subIds) > 0 {
142                 subId := r.subIds[0]
143                 r.subIds = r.subIds[1:]
144                 if _, ok := r.register[subId]; ok == true {
145                         r.subIds = append(r.subIds, subId)
146                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
147                 }
148                 subs := &Subscription{
149                         registry:         r,
150                         Meid:             trans.Meid,
151                         SubReqMsg:        subReqMsg,
152                         valid:            true,
153                         RetryFromXapp:    false,
154                         SubRespRcvd:      false,
155                         DeleteFromDb:     false,
156                         NoRespToXapp:     false,
157                         DoNotWaitSubResp: false,
158                 }
159                 subs.ReqId.Id = subReqMsg.RequestId.Id
160                 subs.ReqId.InstanceId = subId
161                 if resetTestFlag == true {
162                         subs.DoNotWaitSubResp = true
163                 }
164
165                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
166                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
167                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
168                 }
169                 return subs, nil
170         }
171         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
172 }
173
174 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
175
176         for _, subs := range r.register {
177                 if subs.IsMergeable(trans, subReqMsg) {
178
179                         //
180                         // check if there has been race conditions
181                         //
182                         subs.mutex.Lock()
183                         //subs has been set to invalid
184                         if subs.valid == false {
185                                 subs.mutex.Unlock()
186                                 continue
187                         }
188                         // If size is zero, entry is to be deleted
189                         if subs.EpList.Size() == 0 {
190                                 subs.mutex.Unlock()
191                                 continue
192                         }
193                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
194                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
195                                 subs.mutex.Unlock()
196                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
197                                 return subs, true
198                         }
199                         subs.mutex.Unlock()
200
201                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
202                         return subs, false
203                 }
204         }
205         return nil, false
206 }
207
208 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
209         var err error
210         var newAlloc bool
211         r.mutex.Lock()
212         defer r.mutex.Unlock()
213
214         //
215         // Check validity of subscription action types
216         //
217         actionType, err := r.CheckActionTypes(subReqMsg)
218         if err != nil {
219                 xapp.Logger.Debug("CREATE %s", err)
220                 return nil, err
221         }
222
223         //
224         // Find possible existing Policy subscription
225         //
226         if actionType == e2ap.E2AP_ActionTypePolicy {
227                 if subs, ok := r.register[trans.GetSubId()]; ok {
228                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
229                         // Update message data to subscription
230                         subs.SubReqMsg = subReqMsg
231                         subs.SetCachedResponse(nil, true)
232                         return subs, nil
233                 }
234         }
235
236         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
237         if subs == nil {
238                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
239                         return nil, err
240                 }
241                 newAlloc = true
242         } else if endPointFound == true {
243                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
244                 subs.RetryFromXapp = true
245                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
246                 c.UpdateCounter(cDuplicateE2SubReq)
247                 return subs, nil
248         }
249
250         //
251         // Add to subscription
252         //
253         subs.mutex.Lock()
254         defer subs.mutex.Unlock()
255
256         epamount := subs.EpList.Size()
257         xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
258
259         r.mutex.Unlock()
260         //
261         // Subscription route updates
262         //
263         if epamount == 1 {
264                 err = r.RouteCreate(subs, c)
265         } else {
266                 err = r.RouteCreateUpdate(subs, c)
267         }
268         r.mutex.Lock()
269
270         if err != nil {
271                 if newAlloc {
272                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
273                 }
274                 // Delete already added endpoint for the request
275                 subs.EpList.DelEndpoint(trans.GetEndpoint())
276                 return nil, err
277         }
278
279         if newAlloc {
280                 r.register[subs.ReqId.InstanceId] = subs
281         }
282         xapp.Logger.Debug("CREATE %s", subs.String())
283         xapp.Logger.Debug("Registry: substable=%v", r.register)
284         return subs, nil
285 }
286
287 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
288         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
289         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
290         if err != nil {
291                 c.UpdateCounter(cRouteCreateFail)
292         }
293         return err
294 }
295
296 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
297         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
298         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
299         if err != nil {
300                 c.UpdateCounter(cRouteCreateUpdateFail)
301                 return err
302         }
303         c.UpdateCounter(cMergedSubscriptions)
304         return err
305 }
306
307 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
308         var reportFound bool = false
309         var policyFound bool = false
310         var insertFound bool = false
311
312         for _, acts := range subReqMsg.ActionSetups {
313                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
314                         reportFound = true
315                 }
316                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
317                         policyFound = true
318                 }
319                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
320                         insertFound = true
321                 }
322         }
323         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
324                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
325         }
326         if reportFound == true {
327                 return e2ap.E2AP_ActionTypeReport, nil
328         }
329         if policyFound == true {
330                 return e2ap.E2AP_ActionTypePolicy, nil
331         }
332         if insertFound == true {
333                 return e2ap.E2AP_ActionTypeInsert, nil
334         }
335         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
336 }
337
338 // TODO: Works with concurrent calls, but check if can be improved
339 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
340
341         r.mutex.Lock()
342         defer r.mutex.Unlock()
343         subs.mutex.Lock()
344         defer subs.mutex.Unlock()
345
346         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
347         epamount := subs.EpList.Size()
348         subId := subs.ReqId.InstanceId
349
350         if delStatus == false {
351                 return nil
352         }
353
354         go func() {
355                 if waitRouteClean > 0 {
356                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
357                         time.Sleep(waitRouteClean)
358                 }
359
360                 subs.mutex.Lock()
361                 defer subs.mutex.Unlock()
362                 xapp.Logger.Info("CLEAN %s", subs.String())
363
364                 if epamount == 0 {
365                         //
366                         // Subscription route delete
367                         //
368                         r.RouteDelete(subs, trans, c)
369
370                         //
371                         // Subscription release
372                         //
373                         r.mutex.Lock()
374                         defer r.mutex.Unlock()
375
376                         if _, ok := r.register[subId]; ok {
377                                 xapp.Logger.Debug("RELEASE %s", subs.String())
378                                 delete(r.register, subId)
379                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
380                         }
381                         r.subIds = append(r.subIds, subId)
382                 } else if subs.EpList.Size() > 0 {
383                         //
384                         // Subscription route update
385                         //
386                         r.RouteDeleteUpdate(subs, c)
387                 }
388         }()
389
390         return nil
391 }
392
393 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
394         tmpList := xapp.RmrEndpointList{}
395         tmpList.AddEndpoint(trans.GetEndpoint())
396         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
397         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
398                 c.UpdateCounter(cRouteDeleteFail)
399         }
400 }
401
402 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
403         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
404         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
405                 c.UpdateCounter(cRouteDeleteUpdateFail)
406         }
407 }
408
409 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
410         r.mutex.Lock()
411         defer r.mutex.Unlock()
412         subs.mutex.Lock()
413         defer subs.mutex.Unlock()
414
415         epamount := subs.EpList.Size()
416         if epamount == 0 {
417                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
418                         // Not merged subscription is being deleted
419                         c.RemoveSubscriptionFromDb(subs)
420
421                 }
422         } else if subs.EpList.Size() > 0 {
423                 // Endpoint of merged subscription is being deleted
424                 c.WriteSubscriptionToDb(subs)
425                 c.UpdateCounter(cUnmergedSubscriptions)
426         }
427 }
428
429 func (r *Registry) GetSubscription(subId uint32) *Subscription {
430         r.mutex.Lock()
431         defer r.mutex.Unlock()
432         if _, ok := r.register[subId]; ok {
433                 return r.register[subId]
434         }
435         return nil
436 }
437
438 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
439         r.mutex.Lock()
440         defer r.mutex.Unlock()
441         for _, subId := range subIds {
442                 if _, ok := r.register[subId]; ok {
443                         return r.register[subId], nil
444                 }
445         }
446         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
447 }