Xapp-frame, v0.8.1 Rest Subscription Creation /Query /Deletion
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "sync"
25         "time"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type RESTSubscription struct {
37         xAppRmrEndPoint  string
38         Meid             string
39         InstanceIds      []uint32
40         SubReqOngoing    bool
41         SubDelReqOngoing bool
42 }
43
44 func (r *RESTSubscription) AddInstanceId(instanceId uint32) {
45         r.InstanceIds = append(r.InstanceIds, instanceId)
46 }
47
48 func (r *RESTSubscription) SetProcessed() {
49         r.SubReqOngoing = false
50 }
51
52 func (r *RESTSubscription) DeleteInstanceId(instanceId uint32) {
53         r.InstanceIds = r.InstanceIds[1:]
54 }
55
56 type Registry struct {
57         mutex             sync.Mutex
58         register          map[uint32]*Subscription
59         subIds            []uint32
60         rtmgrClient       *RtmgrClient
61         restSubscriptions map[string]*RESTSubscription
62 }
63
64 func (r *Registry) Initialize() {
65         r.register = make(map[uint32]*Subscription)
66         r.restSubscriptions = make(map[string]*RESTSubscription)
67
68         var i uint32
69         for i = 1; i < 65535; i++ {
70                 r.subIds = append(r.subIds, i)
71         }
72 }
73
74 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
75         r.mutex.Lock()
76         defer r.mutex.Unlock()
77         newRestSubscription := RESTSubscription{}
78         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
79         newRestSubscription.Meid = *maid
80         newRestSubscription.SubReqOngoing = true
81         newRestSubscription.SubDelReqOngoing = false
82         r.restSubscriptions[*restSubId] = &newRestSubscription
83         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apCubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
84         return &newRestSubscription, nil
85 }
86
87 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
88         r.mutex.Lock()
89         defer r.mutex.Unlock()
90         delete(r.restSubscriptions, *restSubId)
91         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
92 }
93
94 func (r *Registry) GetRESTSubscription(restSubId string) (*RESTSubscription, error) {
95         r.mutex.Lock()
96         defer r.mutex.Unlock()
97         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
98                 // Subscription deletion is not allowed if prosessing subscription request in not ready
99                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
100                         restSubscription.SubDelReqOngoing = true
101                         r.restSubscriptions[restSubId] = restSubscription
102                         return restSubscription, nil
103                 } else {
104                         return restSubscription, fmt.Errorf("Registry: REST delete request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
105                 }
106                 return restSubscription, nil
107         }
108         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
109 }
110
111 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
112         r.mutex.Lock()
113         defer r.mutex.Unlock()
114
115         resp := models.SubscriptionList{}
116         for _, subs := range r.register {
117                 subs.mutex.Lock()
118                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
119                 subs.mutex.Unlock()
120         }
121         return resp, nil
122 }
123
124 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
125         if len(r.subIds) > 0 {
126                 subId := r.subIds[0]
127                 r.subIds = r.subIds[1:]
128                 if _, ok := r.register[subId]; ok == true {
129                         r.subIds = append(r.subIds, subId)
130                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
131                 }
132                 subs := &Subscription{
133                         registry:         r,
134                         Meid:             trans.Meid,
135                         SubReqMsg:        subReqMsg,
136                         valid:            true,
137                         RetryFromXapp:    false,
138                         SubRespRcvd:      false,
139                         DeleteFromDb:     false,
140                         NoRespToXapp:     false,
141                         DoNotWaitSubResp: false,
142                 }
143                 subs.ReqId.Id = 123
144                 subs.ReqId.InstanceId = subId
145                 if resetTestFlag == true {
146                         subs.DoNotWaitSubResp = true
147                 }
148
149                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
150                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
151                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
152                 }
153                 return subs, nil
154         }
155         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
156 }
157
158 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
159
160         for _, subs := range r.register {
161                 if subs.IsMergeable(trans, subReqMsg) {
162
163                         //
164                         // check if there has been race conditions
165                         //
166                         subs.mutex.Lock()
167                         //subs has been set to invalid
168                         if subs.valid == false {
169                                 subs.mutex.Unlock()
170                                 continue
171                         }
172                         // If size is zero, entry is to be deleted
173                         if subs.EpList.Size() == 0 {
174                                 subs.mutex.Unlock()
175                                 continue
176                         }
177                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
178                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
179                                 subs.mutex.Unlock()
180                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
181                                 return subs, true
182                         }
183                         subs.mutex.Unlock()
184
185                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
186                         return subs, false
187                 }
188         }
189         return nil, false
190 }
191
192 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
193         var err error
194         var newAlloc bool
195         r.mutex.Lock()
196         defer r.mutex.Unlock()
197
198         //
199         // Check validity of subscription action types
200         //
201         actionType, err := r.CheckActionTypes(subReqMsg)
202         if err != nil {
203                 xapp.Logger.Debug("CREATE %s", err)
204                 return nil, err
205         }
206
207         //
208         // Find possible existing Policy subscription
209         //
210         if actionType == e2ap.E2AP_ActionTypePolicy {
211                 if subs, ok := r.register[trans.GetSubId()]; ok {
212                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
213                         // Update message data to subscription
214                         subs.SubReqMsg = subReqMsg
215                         subs.SetCachedResponse(nil, true)
216                         return subs, nil
217                 }
218         }
219
220         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
221         if subs == nil {
222                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
223                         return nil, err
224                 }
225                 newAlloc = true
226         } else if endPointFound == true {
227                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
228                 subs.RetryFromXapp = true
229                 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
230                 return subs, nil
231         }
232
233         //
234         // Add to subscription
235         //
236         subs.mutex.Lock()
237         defer subs.mutex.Unlock()
238
239         epamount := subs.EpList.Size()
240         xapp.Logger.Info("AssignToSubscription subs.EpList.Size() = %v", subs.EpList.Size())
241
242         r.mutex.Unlock()
243         //
244         // Subscription route updates
245         //
246         if epamount == 1 {
247                 err = r.RouteCreate(subs, c)
248         } else {
249                 err = r.RouteCreateUpdate(subs, c)
250         }
251         r.mutex.Lock()
252
253         if err != nil {
254                 if newAlloc {
255                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
256                 }
257                 // Delete already added endpoint for the request
258                 subs.EpList.DelEndpoint(trans.GetEndpoint())
259                 return nil, err
260         }
261
262         if newAlloc {
263                 r.register[subs.ReqId.InstanceId] = subs
264         }
265         xapp.Logger.Debug("CREATE %s", subs.String())
266         xapp.Logger.Debug("Registry: substable=%v", r.register)
267         return subs, nil
268 }
269
270 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
271         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
272         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
273         if err != nil {
274                 c.UpdateCounter(cRouteCreateFail)
275         }
276         return err
277 }
278
279 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
280         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
281         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
282         if err != nil {
283                 c.UpdateCounter(cRouteCreateUpdateFail)
284                 return err
285         }
286         c.UpdateCounter(cMergedSubscriptions)
287         return err
288 }
289
290 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
291         var reportFound bool = false
292         var policyFound bool = false
293         var insertFound bool = false
294
295         for _, acts := range subReqMsg.ActionSetups {
296                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
297                         reportFound = true
298                 }
299                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
300                         policyFound = true
301                 }
302                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
303                         insertFound = true
304                 }
305         }
306         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
307                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
308         }
309         if reportFound == true {
310                 return e2ap.E2AP_ActionTypeReport, nil
311         }
312         if policyFound == true {
313                 return e2ap.E2AP_ActionTypePolicy, nil
314         }
315         if insertFound == true {
316                 return e2ap.E2AP_ActionTypeInsert, nil
317         }
318         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
319 }
320
321 // TODO: Works with concurrent calls, but check if can be improved
322 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
323
324         r.mutex.Lock()
325         defer r.mutex.Unlock()
326         subs.mutex.Lock()
327         defer subs.mutex.Unlock()
328
329         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
330         epamount := subs.EpList.Size()
331         subId := subs.ReqId.InstanceId
332
333         if delStatus == false {
334                 return nil
335         }
336
337         go func() {
338                 if waitRouteClean > 0 {
339                         time.Sleep(waitRouteClean)
340                 }
341
342                 subs.mutex.Lock()
343                 defer subs.mutex.Unlock()
344                 xapp.Logger.Info("CLEAN %s", subs.String())
345
346                 if epamount == 0 {
347                         //
348                         // Subscription route delete
349                         //
350                         r.RouteDelete(subs, trans, c)
351
352                         //
353                         // Subscription release
354                         //
355                         r.mutex.Lock()
356                         defer r.mutex.Unlock()
357
358                         if _, ok := r.register[subId]; ok {
359                                 xapp.Logger.Debug("RELEASE %s", subs.String())
360                                 delete(r.register, subId)
361                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
362                         }
363                         r.subIds = append(r.subIds, subId)
364                 } else if subs.EpList.Size() > 0 {
365                         //
366                         // Subscription route update
367                         //
368                         r.RouteDeleteUpdate(subs, c)
369                 }
370         }()
371
372         return nil
373 }
374
375 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
376         tmpList := xapp.RmrEndpointList{}
377         tmpList.AddEndpoint(trans.GetEndpoint())
378         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
379         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
380                 c.UpdateCounter(cRouteDeleteFail)
381         }
382 }
383
384 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
385         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
386         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
387                 c.UpdateCounter(cRouteDeleteUpdateFail)
388         }
389 }
390
391 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
392         r.mutex.Lock()
393         defer r.mutex.Unlock()
394         subs.mutex.Lock()
395         defer subs.mutex.Unlock()
396
397         epamount := subs.EpList.Size()
398         if epamount == 0 {
399                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
400                         // Not merged subscription is being deleted
401                         c.RemoveSubscriptionFromDb(subs)
402
403                 }
404         } else if subs.EpList.Size() > 0 {
405                 // Endpoint of merged subscription is being deleted
406                 c.WriteSubscriptionToDb(subs)
407                 c.UpdateCounter(cUnmergedSubscriptions)
408         }
409 }
410
411 func (r *Registry) GetSubscription(subId uint32) *Subscription {
412         r.mutex.Lock()
413         defer r.mutex.Unlock()
414         if _, ok := r.register[subId]; ok {
415                 return r.register[subId]
416         }
417         return nil
418 }
419
420 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
421         r.mutex.Lock()
422         defer r.mutex.Unlock()
423         for _, subId := range subIds {
424                 if _, ok := r.register[subId]; ok {
425                         return r.register[subId], nil
426                 }
427         }
428         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
429 }