Added REST notify error cause
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "sync"
25         "time"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type RESTSubscription struct {
37         xAppRmrEndPoint  string
38         Meid             string
39         InstanceIds      []uint32
40         xAppIdToE2Id     map[int64]int64
41         SubReqOngoing    bool
42         SubDelReqOngoing bool
43 }
44
45 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
46         r.InstanceIds = append(r.InstanceIds, instanceId)
47 }
48
49 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
50         r.InstanceIds = r.InstanceIds[1:]
51 }
52
53 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
54         r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
55 }
56
57 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
58         return r.xAppIdToE2Id[xAppEventInstanceID]
59 }
60
61 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
62         delete(r.xAppIdToE2Id, xAppEventInstanceID)
63 }
64
65 func (r *RESTSubscription) SetProcessed() {
66         r.SubReqOngoing = false
67 }
68
69 type Registry struct {
70         mutex             sync.Mutex
71         register          map[uint32]*Subscription
72         subIds            []uint32
73         rtmgrClient       *RtmgrClient
74         restSubscriptions map[string]*RESTSubscription
75 }
76
77 func (r *Registry) Initialize() {
78         r.register = make(map[uint32]*Subscription)
79         r.restSubscriptions = make(map[string]*RESTSubscription)
80
81         var i uint32
82         for i = 1; i < 65535; i++ {
83                 r.subIds = append(r.subIds, i)
84         }
85 }
86
87 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
88         r.mutex.Lock()
89         defer r.mutex.Unlock()
90         newRestSubscription := RESTSubscription{}
91         newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
92         newRestSubscription.Meid = *maid
93         newRestSubscription.SubReqOngoing = true
94         newRestSubscription.SubDelReqOngoing = false
95         r.restSubscriptions[*restSubId] = &newRestSubscription
96         newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
97         xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
98         return &newRestSubscription, nil
99 }
100
101 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
102         r.mutex.Lock()
103         defer r.mutex.Unlock()
104         delete(r.restSubscriptions, *restSubId)
105         xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
106 }
107
108 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
109         r.mutex.Lock()
110         defer r.mutex.Unlock()
111         if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
112                 // Subscription deletion is not allowed if prosessing subscription request in not ready
113                 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
114                         if IsDelReqOngoing == true {
115                                 restSubscription.SubDelReqOngoing = true
116                         }
117                         r.restSubscriptions[restSubId] = restSubscription
118                         return restSubscription, nil
119                 } else {
120                         return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
121                 }
122                 return restSubscription, nil
123         }
124         return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
125 }
126
127 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
128         r.mutex.Lock()
129         defer r.mutex.Unlock()
130
131         resp := models.SubscriptionList{}
132         for _, subs := range r.register {
133                 subs.mutex.Lock()
134                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
135                 subs.mutex.Unlock()
136         }
137         return resp, nil
138 }
139
140 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
141         if len(r.subIds) > 0 {
142                 subId := r.subIds[0]
143                 r.subIds = r.subIds[1:]
144                 if _, ok := r.register[subId]; ok == true {
145                         r.subIds = append(r.subIds, subId)
146                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
147                 }
148                 subs := &Subscription{
149                         registry:         r,
150                         Meid:             trans.Meid,
151                         SubReqMsg:        subReqMsg,
152                         valid:            true,
153                         RetryFromXapp:    false,
154                         SubRespRcvd:      false,
155                         DeleteFromDb:     false,
156                         NoRespToXapp:     false,
157                         DoNotWaitSubResp: false,
158                 }
159                 subs.ReqId.Id = subReqMsg.RequestId.Id
160                 subs.ReqId.InstanceId = subId
161                 if resetTestFlag == true {
162                         subs.DoNotWaitSubResp = true
163                 }
164
165                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
166                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
167                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
168                 }
169                 return subs, nil
170         }
171         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
172 }
173
174 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
175
176         for _, subs := range r.register {
177                 if subs.IsMergeable(trans, subReqMsg) {
178
179                         //
180                         // check if there has been race conditions
181                         //
182                         subs.mutex.Lock()
183                         //subs has been set to invalid
184                         if subs.valid == false {
185                                 subs.mutex.Unlock()
186                                 continue
187                         }
188                         // If size is zero, entry is to be deleted
189                         if subs.EpList.Size() == 0 {
190                                 subs.mutex.Unlock()
191                                 continue
192                         }
193                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
194                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
195                                 subs.mutex.Unlock()
196                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
197                                 return subs, true
198                         }
199                         subs.mutex.Unlock()
200
201                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
202                         return subs, false
203                 }
204         }
205         return nil, false
206 }
207
208 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
209         var err error
210         var newAlloc bool
211         r.mutex.Lock()
212         defer r.mutex.Unlock()
213
214         //
215         // Check validity of subscription action types
216         //
217         actionType, err := r.CheckActionTypes(subReqMsg)
218         if err != nil {
219                 xapp.Logger.Info("CREATE %s", err)
220                 err = fmt.Errorf("E2 content validation failed")
221                 return nil, err
222         }
223
224         //
225         // Find possible existing Policy subscription
226         //
227         if actionType == e2ap.E2AP_ActionTypePolicy {
228                 if subs, ok := r.register[trans.GetSubId()]; ok {
229                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
230                         // Update message data to subscription
231                         subs.SubReqMsg = subReqMsg
232                         subs.SetCachedResponse(nil, true)
233                         return subs, nil
234                 }
235         }
236
237         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
238         if subs == nil {
239                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
240                         xapp.Logger.Error("%s", err.Error())
241                         err = fmt.Errorf("subscription not allocated")
242                         return nil, err
243                 }
244                 newAlloc = true
245         } else if endPointFound == true {
246                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
247                 subs.RetryFromXapp = true
248                 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
249                 c.UpdateCounter(cDuplicateE2SubReq)
250                 return subs, nil
251         }
252
253         //
254         // Add to subscription
255         //
256         subs.mutex.Lock()
257         defer subs.mutex.Unlock()
258
259         epamount := subs.EpList.Size()
260         xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
261
262         r.mutex.Unlock()
263         //
264         // Subscription route updates
265         //
266         if epamount == 1 {
267                 err = r.RouteCreate(subs, c)
268         } else {
269                 err = r.RouteCreateUpdate(subs, c)
270         }
271         r.mutex.Lock()
272
273         if err != nil {
274                 if newAlloc {
275                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
276                 }
277                 // Delete already added endpoint for the request
278                 subs.EpList.DelEndpoint(trans.GetEndpoint())
279                 return nil, err
280         }
281
282         if newAlloc {
283                 r.register[subs.ReqId.InstanceId] = subs
284         }
285         xapp.Logger.Debug("CREATE %s", subs.String())
286         xapp.Logger.Debug("Registry: substable=%v", r.register)
287         return subs, nil
288 }
289
290 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
291         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
292         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
293         if err != nil {
294                 c.UpdateCounter(cRouteCreateFail)
295                 xapp.Logger.Error("%s", err.Error())
296                 err = fmt.Errorf("RTMGR route create failure")
297         }
298         return err
299 }
300
301 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
302         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
303         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
304         if err != nil {
305                 c.UpdateCounter(cRouteCreateUpdateFail)
306                 xapp.Logger.Error("%s", err.Error())
307                 err = fmt.Errorf("RTMGR route update failure")
308                 return err
309         }
310         c.UpdateCounter(cMergedSubscriptions)
311         return err
312 }
313
314 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
315         var reportFound bool = false
316         var policyFound bool = false
317         var insertFound bool = false
318
319         for _, acts := range subReqMsg.ActionSetups {
320                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
321                         reportFound = true
322                 }
323                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
324                         policyFound = true
325                 }
326                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
327                         insertFound = true
328                 }
329         }
330         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
331                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
332         }
333         if reportFound == true {
334                 return e2ap.E2AP_ActionTypeReport, nil
335         }
336         if policyFound == true {
337                 return e2ap.E2AP_ActionTypePolicy, nil
338         }
339         if insertFound == true {
340                 return e2ap.E2AP_ActionTypeInsert, nil
341         }
342         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
343 }
344
345 // TODO: Works with concurrent calls, but check if can be improved
346 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
347
348         r.mutex.Lock()
349         defer r.mutex.Unlock()
350         subs.mutex.Lock()
351         defer subs.mutex.Unlock()
352
353         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
354         epamount := subs.EpList.Size()
355         subId := subs.ReqId.InstanceId
356
357         if delStatus == false {
358                 return nil
359         }
360
361         go func() {
362                 if waitRouteClean > 0 {
363                         xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
364                         time.Sleep(waitRouteClean)
365                 }
366
367                 subs.mutex.Lock()
368                 defer subs.mutex.Unlock()
369                 xapp.Logger.Info("CLEAN %s", subs.String())
370
371                 if epamount == 0 {
372                         //
373                         // Subscription route delete
374                         //
375                         r.RouteDelete(subs, trans, c)
376
377                         //
378                         // Subscription release
379                         //
380                         r.mutex.Lock()
381                         defer r.mutex.Unlock()
382
383                         if _, ok := r.register[subId]; ok {
384                                 xapp.Logger.Debug("RELEASE %s", subs.String())
385                                 delete(r.register, subId)
386                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
387                         }
388                         r.subIds = append(r.subIds, subId)
389                 } else if subs.EpList.Size() > 0 {
390                         //
391                         // Subscription route update
392                         //
393                         r.RouteDeleteUpdate(subs, c)
394                 }
395         }()
396
397         return nil
398 }
399
400 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
401         tmpList := xapp.RmrEndpointList{}
402         tmpList.AddEndpoint(trans.GetEndpoint())
403         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
404         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
405                 c.UpdateCounter(cRouteDeleteFail)
406         }
407 }
408
409 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
410         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
411         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
412                 c.UpdateCounter(cRouteDeleteUpdateFail)
413         }
414 }
415
416 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
417         r.mutex.Lock()
418         defer r.mutex.Unlock()
419         subs.mutex.Lock()
420         defer subs.mutex.Unlock()
421
422         epamount := subs.EpList.Size()
423         if epamount == 0 {
424                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
425                         // Not merged subscription is being deleted
426                         c.RemoveSubscriptionFromDb(subs)
427
428                 }
429         } else if subs.EpList.Size() > 0 {
430                 // Endpoint of merged subscription is being deleted
431                 c.WriteSubscriptionToDb(subs)
432                 c.UpdateCounter(cUnmergedSubscriptions)
433         }
434 }
435
436 func (r *Registry) GetSubscription(subId uint32) *Subscription {
437         r.mutex.Lock()
438         defer r.mutex.Unlock()
439         if _, ok := r.register[subId]; ok {
440                 return r.register[subId]
441         }
442         return nil
443 }
444
445 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
446         r.mutex.Lock()
447         defer r.mutex.Unlock()
448         for _, subId := range subIds {
449                 if _, ok := r.register[subId]; ok {
450                         return r.register[subId], nil
451                 }
452         }
453         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
454 }