Submgr restart improvement
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34
35 type Registry struct {
36         mutex       sync.Mutex
37         register    map[uint32]*Subscription
38         subIds      []uint32
39         rtmgrClient *RtmgrClient
40 }
41
42 func (r *Registry) Initialize() {
43         r.register = make(map[uint32]*Subscription)
44         var i uint32
45         for i = 1; i < 65535; i++ {
46                 r.subIds = append(r.subIds, i)
47         }
48 }
49
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
51         r.mutex.Lock()
52         defer r.mutex.Unlock()
53
54         resp := models.SubscriptionList{}
55         for _, subs := range r.register {
56                 subs.mutex.Lock()
57                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
58                 subs.mutex.Unlock()
59         }
60         return resp, nil
61 }
62
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
64         if len(r.subIds) > 0 {
65                 subId := r.subIds[0]
66                 r.subIds = r.subIds[1:]
67                 if _, ok := r.register[subId]; ok == true {
68                         r.subIds = append(r.subIds, subId)
69                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
70                 }
71                 subs := &Subscription{
72                         registry:         r,
73                         Meid:             trans.Meid,
74                         SubReqMsg:        subReqMsg,
75                         valid:            true,
76                         RetryFromXapp:    false,
77                         SubRespRcvd:      false,
78                         DeleteFromDb:     false,
79                         NoRespToXapp:     false,
80                         DoNotWaitSubResp: false,
81                 }
82                 subs.ReqId.Id = 123
83                 subs.ReqId.InstanceId = subId
84                 if resetTestFlag == true {
85                         subs.DoNotWaitSubResp = true
86                 }
87
88                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
89                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
90                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
91                 }
92                 return subs, nil
93         }
94         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
95 }
96
97 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
98
99         for _, subs := range r.register {
100                 if subs.IsMergeable(trans, subReqMsg) {
101
102                         //
103                         // check if there has been race conditions
104                         //
105                         subs.mutex.Lock()
106                         //subs has been set to invalid
107                         if subs.valid == false {
108                                 subs.mutex.Unlock()
109                                 continue
110                         }
111                         // If size is zero, entry is to be deleted
112                         if subs.EpList.Size() == 0 {
113                                 subs.mutex.Unlock()
114                                 continue
115                         }
116                         // Try to add to endpointlist. Adding fails if endpoint is already in the list
117                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
118                                 subs.mutex.Unlock()
119                                 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
120                                 return subs, true
121                         }
122                         subs.mutex.Unlock()
123
124                         xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
125                         return subs, false
126                 }
127         }
128         return nil, false
129 }
130
131 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
132         var err error
133         var newAlloc bool
134         r.mutex.Lock()
135         defer r.mutex.Unlock()
136
137         //
138         // Check validity of subscription action types
139         //
140         actionType, err := r.CheckActionTypes(subReqMsg)
141         if err != nil {
142                 xapp.Logger.Debug("CREATE %s", err)
143                 return nil, err
144         }
145
146         //
147         // Find possible existing Policy subscription
148         //
149         if actionType == e2ap.E2AP_ActionTypePolicy {
150                 if subs, ok := r.register[trans.GetSubId()]; ok {
151                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
152                         // Update message data to subscription
153                         subs.SubReqMsg = subReqMsg
154                         subs.SetCachedResponse(nil, true)
155                         return subs, nil
156                 }
157         }
158
159         subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
160         if subs == nil {
161                 subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag)
162                 if err != nil {
163                         return nil, err
164                 }
165                 newAlloc = true
166         } else if endPointFound == true {
167                 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
168                 subs.RetryFromXapp = true
169                 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
170                 //xapp.Logger.Debug("Registry: substable=%v", r.register)
171                 return subs, nil
172         }
173
174         //
175         // Add to subscription
176         //
177         subs.mutex.Lock()
178         defer subs.mutex.Unlock()
179
180         epamount := subs.EpList.Size()
181
182         r.mutex.Unlock()
183         //
184         // Subscription route updates
185         //
186         if epamount == 1 {
187                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
188                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
189         } else {
190                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
191                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
192         }
193         r.mutex.Lock()
194
195         if err != nil {
196                 if newAlloc {
197                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
198                 }
199                 return nil, err
200         }
201
202         if newAlloc {
203                 r.register[subs.ReqId.InstanceId] = subs
204         }
205         xapp.Logger.Debug("CREATE %s", subs.String())
206         xapp.Logger.Debug("Registry: substable=%v", r.register)
207         return subs, nil
208 }
209
210 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
211         var reportFound bool = false
212         var policyFound bool = false
213         var insertFound bool = false
214
215         for _, acts := range subReqMsg.ActionSetups {
216                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
217                         reportFound = true
218                 }
219                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
220                         policyFound = true
221                 }
222                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
223                         insertFound = true
224                 }
225         }
226         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
227                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
228         }
229         if reportFound == true {
230                 return e2ap.E2AP_ActionTypeReport, nil
231         }
232         if policyFound == true {
233                 return e2ap.E2AP_ActionTypePolicy, nil
234         }
235         if insertFound == true {
236                 return e2ap.E2AP_ActionTypeInsert, nil
237         }
238         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
239 }
240
241 // TODO: Works with concurrent calls, but check if can be improved
242 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
243
244         r.mutex.Lock()
245         defer r.mutex.Unlock()
246         subs.mutex.Lock()
247         defer subs.mutex.Unlock()
248
249         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
250         epamount := subs.EpList.Size()
251         subId := subs.ReqId.InstanceId
252
253         if delStatus == false {
254                 return nil
255         }
256
257         go func() {
258                 if waitRouteClean > 0 {
259                         time.Sleep(waitRouteClean)
260                 }
261
262                 subs.mutex.Lock()
263                 defer subs.mutex.Unlock()
264                 xapp.Logger.Info("CLEAN %s", subs.String())
265
266                 if epamount == 0 {
267                         //
268                         // Subscription route delete
269                         //
270                         tmpList := xapp.RmrEndpointList{}
271                         tmpList.AddEndpoint(trans.GetEndpoint())
272                         subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
273                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
274
275                         //
276                         // Subscription release
277                         //
278                         r.mutex.Lock()
279                         defer r.mutex.Unlock()
280
281                         if _, ok := r.register[subId]; ok {
282                                 xapp.Logger.Debug("RELEASE %s", subs.String())
283                                 delete(r.register, subId)
284                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
285                         }
286                         r.subIds = append(r.subIds, subId)
287                 } else if subs.EpList.Size() > 0 {
288                         //
289                         // Subscription route updates
290                         //
291                         subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
292                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
293                 }
294         }()
295
296         return nil
297 }
298
299 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
300         r.mutex.Lock()
301         defer r.mutex.Unlock()
302         subs.mutex.Lock()
303         defer subs.mutex.Unlock()
304
305         epamount := subs.EpList.Size()
306         if epamount == 0 {
307                 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
308                         // Not merged subscription is being deleted
309                         c.RemoveSubscriptionFromDb(subs)
310
311                 }
312         } else if subs.EpList.Size() > 0 {
313                 // Endpoint of merged subscription is being deleted
314                 c.WriteSubscriptionToDb(subs)
315         }
316 }
317
318 func (r *Registry) GetSubscription(subId uint32) *Subscription {
319         r.mutex.Lock()
320         defer r.mutex.Unlock()
321         if _, ok := r.register[subId]; ok {
322                 return r.register[subId]
323         }
324         return nil
325 }
326
327 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
328         r.mutex.Lock()
329         defer r.mutex.Unlock()
330         for _, subId := range subIds {
331                 if _, ok := r.register[subId]; ok {
332                         return r.register[subId], nil
333                 }
334         }
335         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
336 }