9e4eaaa6c2235d2cf507e6f1b0bc4662a66de9ea
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34
35 type Registry struct {
36         mutex       sync.Mutex
37         register    map[uint32]*Subscription
38         subIds      []uint32
39         rtmgrClient *RtmgrClient
40 }
41
42 func (r *Registry) Initialize() {
43         r.register = make(map[uint32]*Subscription)
44         var i uint32
45         for i = 0; i < 65535; i++ {
46                 r.subIds = append(r.subIds, i+1)
47         }
48 }
49
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
51         r.mutex.Lock()
52         defer r.mutex.Unlock()
53
54         resp := models.SubscriptionList{}
55         for _, subs := range r.register {
56                 subs.mutex.Lock()
57                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
58                 subs.mutex.Unlock()
59         }
60         return resp, nil
61 }
62
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64         if len(r.subIds) > 0 {
65                 subId := r.subIds[0]
66                 r.subIds = r.subIds[1:]
67                 if _, ok := r.register[subId]; ok == true {
68                         r.subIds = append(r.subIds, subId)
69                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
70                 }
71                 subs := &Subscription{
72                         registry:  r,
73                         Meid:      trans.Meid,
74                         SubReqMsg: subReqMsg,
75                         valid:     true,
76                 }
77                 subs.ReqId.Id = 123
78                 subs.ReqId.InstanceId = subId
79
80                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
82                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
83                 }
84
85                 return subs, nil
86         }
87         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
88 }
89
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
91
92         for _, subs := range r.register {
93                 if subs.IsMergeable(trans, subReqMsg) {
94
95                         //
96                         // check if there has been race conditions
97                         //
98                         subs.mutex.Lock()
99                         //subs has been set to invalid
100                         if subs.valid == false {
101                                 subs.mutex.Unlock()
102                                 continue
103                         }
104                         // If size is zero, entry is to be deleted
105                         if subs.EpList.Size() == 0 {
106                                 subs.mutex.Unlock()
107                                 continue
108                         }
109                         // try to add to endpointlist.
110                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
111                                 subs.mutex.Unlock()
112                                 continue
113                         }
114                         subs.mutex.Unlock()
115
116                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
117                         return subs
118                 }
119         }
120         return nil
121 }
122
123 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
124         var err error
125         var newAlloc bool
126         r.mutex.Lock()
127         defer r.mutex.Unlock()
128
129         //
130         // Check validity of subscription action types
131         //
132         actionType, err := r.CheckActionTypes(subReqMsg)
133         if err != nil {
134                 xapp.Logger.Debug("CREATE %s", err)
135                 return nil, err
136         }
137
138         //
139         // Find possible existing Policy subscription
140         //
141         if actionType == e2ap.E2AP_ActionTypePolicy {
142                 if subs, ok := r.register[trans.GetSubId()]; ok {
143                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
144                         // Update message data to subscription
145                         subs.SubReqMsg = subReqMsg
146                         subs.SetCachedResponse(nil, true)
147                         return subs, nil
148                 }
149         }
150
151         subs := r.findExistingSubs(trans, subReqMsg)
152         if subs == nil {
153                 subs, err = r.allocateSubs(trans, subReqMsg)
154                 if err != nil {
155                         return nil, err
156                 }
157                 newAlloc = true
158         }
159
160         //
161         // Add to subscription
162         //
163         subs.mutex.Lock()
164         defer subs.mutex.Unlock()
165
166         epamount := subs.EpList.Size()
167
168         r.mutex.Unlock()
169         //
170         // Subscription route updates
171         //
172         if epamount == 1 {
173                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
174                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
175         } else {
176                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
177                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
178         }
179         r.mutex.Lock()
180
181         if err != nil {
182                 if newAlloc {
183                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
184                 }
185                 return nil, err
186         }
187
188         if newAlloc {
189                 r.register[subs.ReqId.InstanceId] = subs
190         }
191         xapp.Logger.Debug("CREATE %s", subs.String())
192         xapp.Logger.Debug("Registry: substable=%v", r.register)
193         return subs, nil
194 }
195
196 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
197         var reportFound bool = false
198         var policyFound bool = false
199         var insertFound bool = false
200
201         for _, acts := range subReqMsg.ActionSetups {
202                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
203                         reportFound = true
204                 }
205                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
206                         policyFound = true
207                 }
208                 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
209                         insertFound = true
210                 }
211         }
212         if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
213                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
214         }
215         if reportFound == true {
216                 return e2ap.E2AP_ActionTypeReport, nil
217         }
218         if policyFound == true {
219                 return e2ap.E2AP_ActionTypePolicy, nil
220         }
221         if insertFound == true {
222                 return e2ap.E2AP_ActionTypeInsert, nil
223         }
224         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
225 }
226
227 // TODO: Works with concurrent calls, but check if can be improved
228 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
229
230         r.mutex.Lock()
231         defer r.mutex.Unlock()
232         subs.mutex.Lock()
233         defer subs.mutex.Unlock()
234
235         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
236         epamount := subs.EpList.Size()
237         subId := subs.ReqId.InstanceId
238
239         if delStatus == false {
240                 return nil
241         }
242
243         go func() {
244                 if waitRouteClean > 0 {
245                         time.Sleep(waitRouteClean)
246                 }
247
248                 subs.mutex.Lock()
249                 defer subs.mutex.Unlock()
250                 xapp.Logger.Info("CLEAN %s", subs.String())
251
252                 if epamount == 0 {
253                         //
254                         // Subscription route delete
255                         //
256                         tmpList := xapp.RmrEndpointList{}
257                         tmpList.AddEndpoint(trans.GetEndpoint())
258                         subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
259                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
260
261                         //
262                         // Subscription release
263                         //
264                         r.mutex.Lock()
265                         defer r.mutex.Unlock()
266
267                         if _, ok := r.register[subId]; ok {
268                                 xapp.Logger.Debug("RELEASE %s", subs.String())
269                                 delete(r.register, subId)
270                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
271                         }
272                         r.subIds = append(r.subIds, subId)
273
274                 } else if subs.EpList.Size() > 0 {
275                         //
276                         // Subscription route updates
277                         //
278                         subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
279                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
280                 }
281
282         }()
283
284         return nil
285 }
286
287 func (r *Registry) GetSubscription(subId uint32) *Subscription {
288         r.mutex.Lock()
289         defer r.mutex.Unlock()
290         if _, ok := r.register[subId]; ok {
291                 return r.register[subId]
292         }
293         return nil
294 }
295
296 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
297         r.mutex.Lock()
298         defer r.mutex.Unlock()
299         for _, subId := range subIds {
300                 if _, ok := r.register[subId]; ok {
301                         return r.register[subId], nil
302                 }
303         }
304         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
305 }