275d572d12922418f25c9ee03886b0686bb66ea0
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34
35 type Registry struct {
36         mutex       sync.Mutex
37         register    map[uint32]*Subscription
38         subIds      []uint32
39         rtmgrClient *RtmgrClient
40 }
41
42 func (r *Registry) Initialize() {
43         r.register = make(map[uint32]*Subscription)
44         var i uint32
45         for i = 0; i < 65535; i++ {
46                 r.subIds = append(r.subIds, i+1)
47         }
48 }
49
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
51         r.mutex.Lock()
52         defer r.mutex.Unlock()
53
54         resp := models.SubscriptionList{}
55         for _, subs := range r.register {
56                 subs.mutex.Lock()
57                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
58                 subs.mutex.Unlock()
59         }
60         return resp, nil
61 }
62
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64         if len(r.subIds) > 0 {
65                 subId := r.subIds[0]
66                 r.subIds = r.subIds[1:]
67                 if _, ok := r.register[subId]; ok == true {
68                         r.subIds = append(r.subIds, subId)
69                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
70                 }
71                 subs := &Subscription{
72                         registry:  r,
73                         Meid:      trans.Meid,
74                         SubReqMsg: subReqMsg,
75                         valid:     true,
76                 }
77                 subs.ReqId.Id = 123
78                 subs.ReqId.InstanceId = subId
79
80                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
82                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
83                 }
84
85                 return subs, nil
86         }
87         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
88 }
89
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
91
92         for _, subs := range r.register {
93                 if subs.IsMergeable(trans, subReqMsg) {
94
95                         //
96                         // check if there has been race conditions
97                         //
98                         subs.mutex.Lock()
99                         //subs has been set to invalid
100                         if subs.valid == false {
101                                 subs.mutex.Unlock()
102                                 continue
103                         }
104                         // If size is zero, entry is to be deleted
105                         if subs.EpList.Size() == 0 {
106                                 subs.mutex.Unlock()
107                                 continue
108                         }
109                         // try to add to endpointlist.
110                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
111                                 subs.mutex.Unlock()
112                                 continue
113                         }
114                         subs.mutex.Unlock()
115
116                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
117                         return subs
118                 }
119         }
120         return nil
121 }
122
123 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
124         var err error
125         var newAlloc bool
126         r.mutex.Lock()
127         defer r.mutex.Unlock()
128
129         //
130         // Check validity of subscription action types
131         //
132         actionType, err := r.CheckActionTypes(subReqMsg)
133         if err != nil {
134                 xapp.Logger.Debug("CREATE %s", err)
135                 return nil, err
136         }
137
138         //
139         // Find possible existing Policy subscription
140         //
141         if actionType == e2ap.E2AP_ActionTypePolicy {
142                 if subs, ok := r.register[trans.GetSubId()]; ok {
143                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
144                         // Update message data to subscription
145                         subs.SubReqMsg = subReqMsg
146                         subs.SetCachedResponse(nil, true)
147                         return subs, nil
148                 }
149         }
150
151         subs := r.findExistingSubs(trans, subReqMsg)
152         if subs == nil {
153                 subs, err = r.allocateSubs(trans, subReqMsg)
154                 if err != nil {
155                         return nil, err
156                 }
157                 newAlloc = true
158         }
159
160         //
161         // Add to subscription
162         //
163         subs.mutex.Lock()
164         defer subs.mutex.Unlock()
165
166         epamount := subs.EpList.Size()
167
168         r.mutex.Unlock()
169         //
170         // Subscription route updates
171         //
172         if epamount == 1 {
173                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
174                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
175         } else {
176                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
177                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
178         }
179         r.mutex.Lock()
180
181         if err != nil {
182                 if newAlloc {
183                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
184                 }
185                 return nil, err
186         }
187
188         if newAlloc {
189                 r.register[subs.ReqId.InstanceId] = subs
190         }
191         xapp.Logger.Debug("CREATE %s", subs.String())
192         xapp.Logger.Debug("Registry: substable=%v", r.register)
193         return subs, nil
194 }
195
196 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
197         var reportFound bool = false
198         var policyFound bool = false
199
200         for _, acts := range subReqMsg.ActionSetups {
201                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
202                         reportFound = true
203                 }
204                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
205                         policyFound = true
206                 }
207         }
208         if reportFound == true && policyFound == true {
209                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
210         }
211         if reportFound == true {
212                 return e2ap.E2AP_ActionTypeReport, nil
213         }
214         if policyFound == true {
215                 return e2ap.E2AP_ActionTypePolicy, nil
216         }
217         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
218 }
219
220 // TODO: Works with concurrent calls, but check if can be improved
221 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
222
223         r.mutex.Lock()
224         defer r.mutex.Unlock()
225         subs.mutex.Lock()
226         defer subs.mutex.Unlock()
227
228         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
229         epamount := subs.EpList.Size()
230         subId := subs.ReqId.InstanceId
231
232         if delStatus == false {
233                 return nil
234         }
235
236         go func() {
237                 if waitRouteClean > 0 {
238                         time.Sleep(waitRouteClean)
239                 }
240
241                 subs.mutex.Lock()
242                 defer subs.mutex.Unlock()
243                 xapp.Logger.Info("CLEAN %s", subs.String())
244
245                 if epamount == 0 {
246                         //
247                         // Subscription route delete
248                         //
249                         tmpList := xapp.RmrEndpointList{}
250                         tmpList.AddEndpoint(trans.GetEndpoint())
251                         subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
252                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
253
254                         //
255                         // Subscription release
256                         //
257                         r.mutex.Lock()
258                         defer r.mutex.Unlock()
259
260                         if _, ok := r.register[subId]; ok {
261                                 xapp.Logger.Debug("RELEASE %s", subs.String())
262                                 delete(r.register, subId)
263                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
264                         }
265                         r.subIds = append(r.subIds, subId)
266
267                 } else if subs.EpList.Size() > 0 {
268                         //
269                         // Subscription route updates
270                         //
271                         subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
272                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
273                 }
274
275         }()
276
277         return nil
278 }
279
280 func (r *Registry) GetSubscription(subId uint32) *Subscription {
281         r.mutex.Lock()
282         defer r.mutex.Unlock()
283         if _, ok := r.register[subId]; ok {
284                 return r.register[subId]
285         }
286         return nil
287 }
288
289 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
290         r.mutex.Lock()
291         defer r.mutex.Unlock()
292         for _, subId := range subIds {
293                 if _, ok := r.register[subId]; ok {
294                         return r.register[subId], nil
295                 }
296         }
297         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
298 }