eae870df2419985ba6390c04da0eadab84ab4951
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "sync"
25         "time"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type RESTSubscription struct {
37         xAppRmrEndPoint  string
38         Meid             string
39         InstanceIds      []uint32
40         SubReqOngoing    bool
41         SubDelReqOngoing bool
42 }
43
44 func (r *RESTSubscription) AddInstanceId(instanceId uint32) {
45         r.InstanceIds = append(r.InstanceIds, instanceId)
46 }
47
48 func (r *RESTSubscription) SetProcessed() {
49         r.SubReqOngoing = false
50 }
51
52 func (r *RESTSubscription) DeleteInstanceId(instanceId uint32) {
53         r.InstanceIds = r.InstanceIds[1:]
54 }
55
56 type Registry struct {
57         mutex             sync.Mutex
58         register          map[uint32]*Subscription
59         subIds            []uint32
60         rtmgrClient       *RtmgrClient
61         restSubscriptions map[string]*RESTSubscription
62 }
63
64 func (r *Registry) Initialize() {
65         r.register = make(map[uint32]*Subscription)
66         r.restSubscriptions = make(map[string]*RESTSubscription)
67
68         var i uint32
69         for i = 1; i < 65535; i++ {
70                 r.subIds = append(r.subIds, i)
71         }
72 }
73
74 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
75         r.mutex.Lock()
76         defer r.mutex.Unlock()
77         newRestSubscription := RESTSubscription{}
78         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
79         newRestSubscription.Meid = *maid
80         newRestSubscription.SubReqOngoing = true
81         newRestSubscription.SubDelReqOngoing = false
82         r.restSubscriptions[*restSubId] = &newRestSubscription
83         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
84         return &newRestSubscription, nil
85 }
86
87 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
88         r.mutex.Lock()
89         defer r.mutex.Unlock()
90         delete(r.restSubscriptions, *restSubId)
91         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
92 }
93
94 func (r *Registry) GetRESTSubscription(restSubId string) (*RESTSubscription, error) {
95         r.mutex.Lock()
96         defer r.mutex.Unlock()
97         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
98                 // Subscription deletion is not allowed if prosessing subscription request in not ready
99                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
100                         restSubscription.SubDelReqOngoing = true
101                         r.restSubscriptions[restSubId] = restSubscription
102                         return restSubscription, nil
103                 } else {
104                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
105                 }
106                 return restSubscription, nil
107         }
108         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
109 }
110
111 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
112         r.mutex.Lock()
113         defer r.mutex.Unlock()
114
115         resp := models.SubscriptionList{}
116         for _, subs := range r.register {
117                 subs.mutex.Lock()
118                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
119                 subs.mutex.Unlock()
120         }
121         return resp, nil
122 }
123
124 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
125         if len(r.subIds) > 0 {
126                 subId := r.subIds[0]
127                 r.subIds = r.subIds[1:]
128                 if _, ok := r.register[subId]; ok == true {
129                         r.subIds = append(r.subIds, subId)
130                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
131                 }
132                 subs := &Subscription{
133                         registry:         r,
134                         Meid:             trans.Meid,
135                         SubReqMsg:        subReqMsg,
136                         valid:            true,
137                         RetryFromXapp:    false,
138                         SubRespRcvd:      false,
139                         DeleteFromDb:     false,
140                         NoRespToXapp:     false,
141                         DoNotWaitSubResp: false,
142                 }
143                 subs.ReqId.Id = 123
144                 subs.ReqId.InstanceId = subId
145                 if resetTestFlag == true {
146                         subs.DoNotWaitSubResp = true
147                 }
148
149                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
150                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
151                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
152                 }
153                 return subs, nil
154         }
155         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
156 }
157
158 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
159
160         for _, subs := range r.register {
161                 if subs.IsMergeable(trans, subReqMsg) {
162
163                         //
164                         // check if there has been race conditions
165                         //
166                         subs.mutex.Lock()
167                         //subs has been set to invalid
168                         if subs.valid == false {
169                                 subs.mutex.Unlock()
170                                 continue
171                         }
172                         // If size is zero, entry is to be deleted
173                         if subs.EpList.Size() == 0 {
174                                 subs.mutex.Unlock()
175                                 continue
176                         }
177                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
178                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
179                                 subs.mutex.Unlock()
180                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
181                                 return subs, true
182                         }
183                         subs.mutex.Unlock()
184
185                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
186                         return subs, false
187                 }
188         }
189         return nil, false
190 }
191
192 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
193         var err error
194         var newAlloc bool
195         r.mutex.Lock()
196         defer r.mutex.Unlock()
197
198         //
199         // Check validity of subscription action types
200         //
201         actionType, err := r.CheckActionTypes(subReqMsg)
202         if err != nil {
203                 xapp.Logger.Debug("CREATE %s", err)
204                 return nil, err
205         }
206
207         //
208         // Find possible existing Policy subscription
209         //
210         if actionType == e2ap.E2AP_ActionTypePolicy {
211                 if subs, ok := r.register[trans.GetSubId()]; ok {
212                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
213                         // Update message data to subscription
214                         subs.SubReqMsg = subReqMsg
215                         subs.SetCachedResponse(nil, true)
216                         return subs, nil
217                 }
218         }
219
220         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
221         if subs == nil {
222                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
223                         return nil, err
224                 }
225                 newAlloc = true
226         } else if endPointFound == true {
227                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
228                 subs.RetryFromXapp = true
229                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
230                 c.UpdateCounter(cDuplicateE2SubReq)
231                 return subs, nil
232         }
233
234         //
235         // Add to subscription
236         //
237         subs.mutex.Lock()
238         defer subs.mutex.Unlock()
239
240         epamount := subs.EpList.Size()
241         xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
242
243         r.mutex.Unlock()
244         //
245         // Subscription route updates
246         //
247         if epamount == 1 {
248                 err = r.RouteCreate(subs, c)
249         } else {
250                 err = r.RouteCreateUpdate(subs, c)
251         }
252         r.mutex.Lock()
253
254         if err != nil {
255                 if newAlloc {
256                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
257                 }
258                 // Delete already added endpoint for the request
259                 subs.EpList.DelEndpoint(trans.GetEndpoint())
260                 return nil, err
261         }
262
263         if newAlloc {
264                 r.register[subs.ReqId.InstanceId] = subs
265         }
266         xapp.Logger.Debug("CREATE %s", subs.String())
267         xapp.Logger.Debug("Registry: substable=%v", r.register)
268         return subs, nil
269 }
270
271 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
272         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
273         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
274         if err != nil {
275                 c.UpdateCounter(cRouteCreateFail)
276         }
277         return err
278 }
279
280 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
281         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
282         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
283         if err != nil {
284                 c.UpdateCounter(cRouteCreateUpdateFail)
285                 return err
286         }
287         c.UpdateCounter(cMergedSubscriptions)
288         return err
289 }
290
291 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
292         var reportFound bool = false
293         var policyFound bool = false
294         var insertFound bool = false
295
296         for _, acts := range subReqMsg.ActionSetups {
297                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
298                         reportFound = true
299                 }
300                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
301                         policyFound = true
302                 }
303                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
304                         insertFound = true
305                 }
306         }
307         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
308                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
309         }
310         if reportFound == true {
311                 return e2ap.E2AP_ActionTypeReport, nil
312         }
313         if policyFound == true {
314                 return e2ap.E2AP_ActionTypePolicy, nil
315         }
316         if insertFound == true {
317                 return e2ap.E2AP_ActionTypeInsert, nil
318         }
319         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
320 }
321
322 // TODO: Works with concurrent calls, but check if can be improved
323 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
324
325         r.mutex.Lock()
326         defer r.mutex.Unlock()
327         subs.mutex.Lock()
328         defer subs.mutex.Unlock()
329
330         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
331         epamount := subs.EpList.Size()
332         subId := subs.ReqId.InstanceId
333
334         if delStatus == false {
335                 return nil
336         }
337
338         go func() {
339                 if waitRouteClean > 0 {
340                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
341                         time.Sleep(waitRouteClean)
342                 }
343
344                 subs.mutex.Lock()
345                 defer subs.mutex.Unlock()
346                 xapp.Logger.Info("CLEAN %s", subs.String())
347
348                 if epamount == 0 {
349                         //
350                         // Subscription route delete
351                         //
352                         r.RouteDelete(subs, trans, c)
353
354                         //
355                         // Subscription release
356                         //
357                         r.mutex.Lock()
358                         defer r.mutex.Unlock()
359
360                         if _, ok := r.register[subId]; ok {
361                                 xapp.Logger.Debug("RELEASE %s", subs.String())
362                                 delete(r.register, subId)
363                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
364                         }
365                         r.subIds = append(r.subIds, subId)
366                 } else if subs.EpList.Size() > 0 {
367                         //
368                         // Subscription route update
369                         //
370                         r.RouteDeleteUpdate(subs, c)
371                 }
372         }()
373
374         return nil
375 }
376
377 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
378         tmpList := xapp.RmrEndpointList{}
379         tmpList.AddEndpoint(trans.GetEndpoint())
380         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
381         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
382                 c.UpdateCounter(cRouteDeleteFail)
383         }
384 }
385
386 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
387         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
388         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
389                 c.UpdateCounter(cRouteDeleteUpdateFail)
390         }
391 }
392
393 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
394         r.mutex.Lock()
395         defer r.mutex.Unlock()
396         subs.mutex.Lock()
397         defer subs.mutex.Unlock()
398
399         epamount := subs.EpList.Size()
400         if epamount == 0 {
401                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
402                         // Not merged subscription is being deleted
403                         c.RemoveSubscriptionFromDb(subs)
404
405                 }
406         } else if subs.EpList.Size() > 0 {
407                 // Endpoint of merged subscription is being deleted
408                 c.WriteSubscriptionToDb(subs)
409                 c.UpdateCounter(cUnmergedSubscriptions)
410         }
411 }
412
413 func (r *Registry) GetSubscription(subId uint32) *Subscription {
414         r.mutex.Lock()
415         defer r.mutex.Unlock()
416         if _, ok := r.register[subId]; ok {
417                 return r.register[subId]
418         }
419         return nil
420 }
421
422 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
423         r.mutex.Lock()
424         defer r.mutex.Unlock()
425         for _, subId := range subIds {
426                 if _, ok := r.register[subId]; ok {
427                         return r.register[subId], nil
428                 }
429         }
430         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
431 }