FCA RIC-1353 xApp restart
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34
35 type Registry struct {
36         mutex       sync.Mutex
37         register    map[uint32]*Subscription
38         subIds      []uint32
39         rtmgrClient *RtmgrClient
40 }
41
42 func (r *Registry) Initialize() {
43         r.register = make(map[uint32]*Subscription)
44         var i uint32
45         for i = 0; i < 65535; i++ {
46                 r.subIds = append(r.subIds, i+1)
47         }
48 }
49
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
51         r.mutex.Lock()
52         defer r.mutex.Unlock()
53
54         resp := models.SubscriptionList{}
55         for _, subs := range r.register {
56                 subs.mutex.Lock()
57                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
58                 subs.mutex.Unlock()
59         }
60         return resp, nil
61 }
62
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64         if len(r.subIds) > 0 {
65                 subId := r.subIds[0]
66                 r.subIds = r.subIds[1:]
67                 if _, ok := r.register[subId]; ok == true {
68                         r.subIds = append(r.subIds, subId)
69                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
70                 }
71                 subs := &Subscription{
72                         registry:  r,
73                         Meid:      trans.Meid,
74                         SubReqMsg: subReqMsg,
75                         valid:     true,
76                 }
77                 subs.ReqId.Id = 123
78                 subs.ReqId.InstanceId = subId
79
80                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
82                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
83                 }
84
85                 return subs, nil
86         }
87         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
88 }
89
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
91
92         for _, subs := range r.register {
93                 if subs.IsMergeable(trans, subReqMsg) {
94
95                         //
96                         // check if there has been race conditions
97                         //
98                         subs.mutex.Lock()
99                         //subs has been set to invalid
100                         if subs.valid == false {
101                                 subs.mutex.Unlock()
102                                 continue
103                         }
104                         // If size is zero, entry is to be deleted
105                         if subs.EpList.Size() == 0 {
106                                 subs.mutex.Unlock()
107                                 continue
108                         }
109                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
110                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
111                                 subs.mutex.Unlock()
112                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
113                                 return subs, true
114                         }
115                         subs.mutex.Unlock()
116
117                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
118                         return subs, false
119                 }
120         }
121         return nil, false
122 }
123
124 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
125         var err error
126         var newAlloc bool
127         r.mutex.Lock()
128         defer r.mutex.Unlock()
129
130         //
131         // Check validity of subscription action types
132         //
133         actionType, err := r.CheckActionTypes(subReqMsg)
134         if err != nil {
135                 xapp.Logger.Debug("CREATE %s", err)
136                 return nil, err
137         }
138
139         //
140         // Find possible existing Policy subscription
141         //
142         if actionType == e2ap.E2AP_ActionTypePolicy {
143                 if subs, ok := r.register[trans.GetSubId()]; ok {
144                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
145                         // Update message data to subscription
146                         subs.SubReqMsg = subReqMsg
147                         subs.SetCachedResponse(nil, true)
148                         return subs, nil
149                 }
150         }
151
152         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
153         if subs == nil {
154                 subs, err = r.allocateSubs(trans, subReqMsg)
155                 if err != nil {
156                         return nil, err
157                 }
158                 newAlloc = true
159         } else if endPointFound == true {
160                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
161                 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
162                 xapp.Logger.Debug("Registry: substable=%v", r.register)
163                 return subs, nil
164         }
165
166         //
167         // Add to subscription
168         //
169         subs.mutex.Lock()
170         defer subs.mutex.Unlock()
171
172         epamount := subs.EpList.Size()
173
174         r.mutex.Unlock()
175         //
176         // Subscription route updates
177         //
178         if epamount == 1 {
179                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
180                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
181         } else {
182                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
183                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
184         }
185         r.mutex.Lock()
186
187         if err != nil {
188                 if newAlloc {
189                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
190                 }
191                 return nil, err
192         }
193
194         if newAlloc {
195                 r.register[subs.ReqId.InstanceId] = subs
196         }
197         xapp.Logger.Debug("CREATE %s", subs.String())
198         xapp.Logger.Debug("Registry: substable=%v", r.register)
199         return subs, nil
200 }
201
202 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
203         var reportFound bool = false
204         var policyFound bool = false
205         var insertFound bool = false
206
207         for _, acts := range subReqMsg.ActionSetups {
208                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
209                         reportFound = true
210                 }
211                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
212                         policyFound = true
213                 }
214                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
215                         insertFound = true
216                 }
217         }
218         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
219                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
220         }
221         if reportFound == true {
222                 return e2ap.E2AP_ActionTypeReport, nil
223         }
224         if policyFound == true {
225                 return e2ap.E2AP_ActionTypePolicy, nil
226         }
227         if insertFound == true {
228                 return e2ap.E2AP_ActionTypeInsert, nil
229         }
230         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
231 }
232
233 // TODO: Works with concurrent calls, but check if can be improved
234 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
235
236         r.mutex.Lock()
237         defer r.mutex.Unlock()
238         subs.mutex.Lock()
239         defer subs.mutex.Unlock()
240
241         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
242         epamount := subs.EpList.Size()
243         subId := subs.ReqId.InstanceId
244
245         if delStatus == false {
246                 return nil
247         }
248
249         go func() {
250                 if waitRouteClean > 0 {
251                         time.Sleep(waitRouteClean)
252                 }
253
254                 subs.mutex.Lock()
255                 defer subs.mutex.Unlock()
256                 xapp.Logger.Info("CLEAN %s", subs.String())
257
258                 if epamount == 0 {
259                         //
260                         // Subscription route delete
261                         //
262                         tmpList := xapp.RmrEndpointList{}
263                         tmpList.AddEndpoint(trans.GetEndpoint())
264                         subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
265                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
266
267                         //
268                         // Subscription release
269                         //
270                         r.mutex.Lock()
271                         defer r.mutex.Unlock()
272
273                         if _, ok := r.register[subId]; ok {
274                                 xapp.Logger.Debug("RELEASE %s", subs.String())
275                                 delete(r.register, subId)
276                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
277                         }
278                         r.subIds = append(r.subIds, subId)
279
280                 } else if subs.EpList.Size() > 0 {
281                         //
282                         // Subscription route updates
283                         //
284                         subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
285                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
286                 }
287
288         }()
289
290         return nil
291 }
292
293 func (r *Registry) GetSubscription(subId uint32) *Subscription {
294         r.mutex.Lock()
295         defer r.mutex.Unlock()
296         if _, ok := r.register[subId]; ok {
297                 return r.register[subId]
298         }
299         return nil
300 }
301
302 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
303         r.mutex.Lock()
304         defer r.mutex.Unlock()
305         for _, subId := range subIds {
306                 if _, ok := r.register[subId]; ok {
307                         return r.register[subId], nil
308                 }
309         }
310         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
311 }