Xapp-frame updated to v0.8.1.
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34
35 type Registry struct {
36         mutex       sync.Mutex
37         register    map[uint32]*Subscription
38         subIds      []uint32
39         rtmgrClient *RtmgrClient
40 }
41
42 func (r *Registry) Initialize() {
43         r.register = make(map[uint32]*Subscription)
44         var i uint32
45         for i = 1; i < 65535; i++ {
46                 r.subIds = append(r.subIds, i)
47         }
48 }
49
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
51         r.mutex.Lock()
52         defer r.mutex.Unlock()
53
54         resp := models.SubscriptionList{}
55         for _, subs := range r.register {
56                 subs.mutex.Lock()
57                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
58                 subs.mutex.Unlock()
59         }
60         return resp, nil
61 }
62
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
64         if len(r.subIds) > 0 {
65                 subId := r.subIds[0]
66                 r.subIds = r.subIds[1:]
67                 if _, ok := r.register[subId]; ok == true {
68                         r.subIds = append(r.subIds, subId)
69                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
70                 }
71                 subs := &Subscription{
72                         registry:         r,
73                         Meid:             trans.Meid,
74                         SubReqMsg:        subReqMsg,
75                         valid:            true,
76                         RetryFromXapp:    false,
77                         SubRespRcvd:      false,
78                         DeleteFromDb:     false,
79                         NoRespToXapp:     false,
80                         DoNotWaitSubResp: false,
81                 }
82                 subs.ReqId.Id = 123
83                 subs.ReqId.InstanceId = subId
84                 if resetTestFlag == true {
85                         subs.DoNotWaitSubResp = true
86                 }
87
88                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
89                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
90                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
91                 }
92                 return subs, nil
93         }
94         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
95 }
96
97 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
98
99         for _, subs := range r.register {
100                 if subs.IsMergeable(trans, subReqMsg) {
101
102                         //
103                         // check if there has been race conditions
104                         //
105                         subs.mutex.Lock()
106                         //subs has been set to invalid
107                         if subs.valid == false {
108                                 subs.mutex.Unlock()
109                                 continue
110                         }
111                         // If size is zero, entry is to be deleted
112                         if subs.EpList.Size() == 0 {
113                                 subs.mutex.Unlock()
114                                 continue
115                         }
116                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
117                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
118                                 subs.mutex.Unlock()
119                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
120                                 return subs, true
121                         }
122                         subs.mutex.Unlock()
123
124                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
125                         return subs, false
126                 }
127         }
128         return nil, false
129 }
130
131 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
132         var err error
133         var newAlloc bool
134         r.mutex.Lock()
135         defer r.mutex.Unlock()
136
137         //
138         // Check validity of subscription action types
139         //
140         actionType, err := r.CheckActionTypes(subReqMsg)
141         if err != nil {
142                 xapp.Logger.Debug("CREATE %s", err)
143                 return nil, err
144         }
145
146         //
147         // Find possible existing Policy subscription
148         //
149         if actionType == e2ap.E2AP_ActionTypePolicy {
150                 if subs, ok := r.register[trans.GetSubId()]; ok {
151                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
152                         // Update message data to subscription
153                         subs.SubReqMsg = subReqMsg
154                         subs.SetCachedResponse(nil, true)
155                         return subs, nil
156                 }
157         }
158
159         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
160         if subs == nil {
161                 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
162                         return nil, err
163                 }
164                 newAlloc = true
165         } else if endPointFound == true {
166                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
167                 subs.RetryFromXapp = true
168                 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
169                 return subs, nil
170         }
171
172         //
173         // Add to subscription
174         //
175         subs.mutex.Lock()
176         defer subs.mutex.Unlock()
177
178         epamount := subs.EpList.Size()
179         xapp.Logger.Info("AssignToSubscription subs.EpList.Size() = %v", subs.EpList.Size())
180
181         r.mutex.Unlock()
182         //
183         // Subscription route updates
184         //
185         if epamount == 1 {
186                 err = r.RouteCreate(subs, c)
187         } else {
188                 err = r.RouteCreateUpdate(subs, c)
189         }
190         r.mutex.Lock()
191
192         if err != nil {
193                 if newAlloc {
194                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
195                 }
196                 // Delete already added endpoint for the request
197                 subs.EpList.DelEndpoint(trans.GetEndpoint())
198                 return nil, err
199         }
200
201         if newAlloc {
202                 r.register[subs.ReqId.InstanceId] = subs
203         }
204         xapp.Logger.Debug("CREATE %s", subs.String())
205         xapp.Logger.Debug("Registry: substable=%v", r.register)
206         return subs, nil
207 }
208
209 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
210         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
211         err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
212         if err != nil {
213                 c.UpdateCounter(cRouteCreateFail)
214         }
215         return err
216 }
217
218 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
219         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
220         err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
221         if err != nil {
222                 c.UpdateCounter(cRouteCreateUpdateFail)
223                 return err
224         }
225         c.UpdateCounter(cMergedSubscriptions)
226         return err
227 }
228
229 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
230         var reportFound bool = false
231         var policyFound bool = false
232         var insertFound bool = false
233
234         for _, acts := range subReqMsg.ActionSetups {
235                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
236                         reportFound = true
237                 }
238                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
239                         policyFound = true
240                 }
241                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
242                         insertFound = true
243                 }
244         }
245         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
246                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
247         }
248         if reportFound == true {
249                 return e2ap.E2AP_ActionTypeReport, nil
250         }
251         if policyFound == true {
252                 return e2ap.E2AP_ActionTypePolicy, nil
253         }
254         if insertFound == true {
255                 return e2ap.E2AP_ActionTypeInsert, nil
256         }
257         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
258 }
259
260 // TODO: Works with concurrent calls, but check if can be improved
261 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
262
263         r.mutex.Lock()
264         defer r.mutex.Unlock()
265         subs.mutex.Lock()
266         defer subs.mutex.Unlock()
267
268         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
269         epamount := subs.EpList.Size()
270         subId := subs.ReqId.InstanceId
271
272         if delStatus == false {
273                 return nil
274         }
275
276         go func() {
277                 if waitRouteClean > 0 {
278                         time.Sleep(waitRouteClean)
279                 }
280
281                 subs.mutex.Lock()
282                 defer subs.mutex.Unlock()
283                 xapp.Logger.Info("CLEAN %s", subs.String())
284
285                 if epamount == 0 {
286                         //
287                         // Subscription route delete
288                         //
289                         r.RouteDelete(subs, trans, c)
290
291                         //
292                         // Subscription release
293                         //
294                         r.mutex.Lock()
295                         defer r.mutex.Unlock()
296
297                         if _, ok := r.register[subId]; ok {
298                                 xapp.Logger.Debug("RELEASE %s", subs.String())
299                                 delete(r.register, subId)
300                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
301                         }
302                         r.subIds = append(r.subIds, subId)
303                 } else if subs.EpList.Size() > 0 {
304                         //
305                         // Subscription route update
306                         //
307                         r.RouteDeleteUpdate(subs, c)
308                 }
309         }()
310
311         return nil
312 }
313
314 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
315         tmpList := xapp.RmrEndpointList{}
316         tmpList.AddEndpoint(trans.GetEndpoint())
317         subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
318         if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
319                 c.UpdateCounter(cRouteDeleteFail)
320         }
321 }
322
323 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
324         subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
325         if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
326                 c.UpdateCounter(cRouteDeleteUpdateFail)
327         }
328 }
329
330 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
331         r.mutex.Lock()
332         defer r.mutex.Unlock()
333         subs.mutex.Lock()
334         defer subs.mutex.Unlock()
335
336         epamount := subs.EpList.Size()
337         if epamount == 0 {
338                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
339                         // Not merged subscription is being deleted
340                         c.RemoveSubscriptionFromDb(subs)
341
342                 }
343         } else if subs.EpList.Size() > 0 {
344                 // Endpoint of merged subscription is being deleted
345                 c.WriteSubscriptionToDb(subs)
346                 c.UpdateCounter(cUnmergedSubscriptions)
347         }
348 }
349
350 func (r *Registry) GetSubscription(subId uint32) *Subscription {
351         r.mutex.Lock()
352         defer r.mutex.Unlock()
353         if _, ok := r.register[subId]; ok {
354                 return r.register[subId]
355         }
356         return nil
357 }
358
359 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
360         r.mutex.Lock()
361         defer r.mutex.Unlock()
362         for _, subId := range subIds {
363                 if _, ok := r.register[subId]; ok {
364                         return r.register[subId], nil
365                 }
366         }
367         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
368 }