Merge newe2 into master
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         "sync"
29         "time"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type Registry struct {
37         mutex       sync.Mutex
38         register    map[uint32]*Subscription
39         subIds      []uint32
40         rtmgrClient *RtmgrClient
41 }
42
43 func (r *Registry) Initialize() {
44         r.register = make(map[uint32]*Subscription)
45         var i uint32
46         for i = 0; i < 65535; i++ {
47                 r.subIds = append(r.subIds, i+1)
48         }
49 }
50
51 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52         r.mutex.Lock()
53         defer r.mutex.Unlock()
54
55         resp := models.SubscriptionList{}
56         for _, subs := range r.register {
57                 subs.mutex.Lock()
58                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
59                 subs.mutex.Unlock()
60         }
61         return resp, nil
62 }
63
64 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
65         if len(r.subIds) > 0 {
66                 subId := r.subIds[0]
67                 r.subIds = r.subIds[1:]
68                 if _, ok := r.register[subId]; ok == true {
69                         r.subIds = append(r.subIds, subId)
70                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71                 }
72                 subs := &Subscription{
73                         registry:  r,
74                         Meid:      trans.Meid,
75                         SubReqMsg: subReqMsg,
76                         valid:     true,
77                 }
78                 subs.ReqId.Id = 123
79                 subs.ReqId.InstanceId = subId
80
81                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
82                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
83                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
84                 }
85
86                 return subs, nil
87         }
88         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
89 }
90
91 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
92
93         for _, subs := range r.register {
94                 if subs.IsMergeable(trans, subReqMsg) {
95
96                         //
97                         // check if there has been race conditions
98                         //
99                         subs.mutex.Lock()
100                         //subs has been set to invalid
101                         if subs.valid == false {
102                                 subs.mutex.Unlock()
103                                 continue
104                         }
105                         // If size is zero, entry is to be deleted
106                         if subs.EpList.Size() == 0 {
107                                 subs.mutex.Unlock()
108                                 continue
109                         }
110                         // try to add to endpointlist.
111                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
112                                 subs.mutex.Unlock()
113                                 continue
114                         }
115                         subs.mutex.Unlock()
116
117                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
118                         return subs
119                 }
120         }
121         return nil
122 }
123
124 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
125         var err error
126         var newAlloc bool
127         r.mutex.Lock()
128         defer r.mutex.Unlock()
129
130         //
131         // Check validity of subscription action types
132         //
133         actionType, err := r.CheckActionTypes(subReqMsg)
134         if err != nil {
135                 xapp.Logger.Debug("CREATE %s", err)
136                 return nil, err
137         }
138
139         //
140         // Find possible existing Policy subscription
141         //
142         if actionType == e2ap.E2AP_ActionTypePolicy {
143                 if subs, ok := r.register[trans.GetSubId()]; ok {
144                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
145                         subs.SetCachedResponse(nil, true)
146                         return subs, nil
147                 }
148         }
149
150         subs := r.findExistingSubs(trans, subReqMsg)
151         if subs == nil {
152                 subs, err = r.allocateSubs(trans, subReqMsg)
153                 if err != nil {
154                         return nil, err
155                 }
156                 newAlloc = true
157         }
158
159         //
160         // Add to subscription
161         //
162         subs.mutex.Lock()
163         defer subs.mutex.Unlock()
164
165         epamount := subs.EpList.Size()
166
167         r.mutex.Unlock()
168         //
169         // Subscription route updates
170         //
171         if epamount == 1 {
172                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
173                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
174         } else {
175                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
176                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
177         }
178         r.mutex.Lock()
179
180         if err != nil {
181                 if newAlloc {
182                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
183                 }
184                 return nil, err
185         }
186
187         if newAlloc {
188                 r.register[subs.ReqId.InstanceId] = subs
189         }
190         xapp.Logger.Debug("CREATE %s", subs.String())
191         xapp.Logger.Debug("Registry: substable=%v", r.register)
192         return subs, nil
193 }
194
195 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
196         var reportFound bool = false
197         var policyFound bool = false
198
199         for _, acts := range subReqMsg.ActionSetups {
200                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
201                         reportFound = true
202                 }
203                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
204                         policyFound = true
205                 }
206         }
207         if reportFound == true && policyFound == true {
208                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
209         }
210         if reportFound == true {
211                 return e2ap.E2AP_ActionTypeReport, nil
212         }
213         if policyFound == true {
214                 return e2ap.E2AP_ActionTypePolicy, nil
215         }
216         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
217 }
218
219 // TODO: Works with concurrent calls, but check if can be improved
220 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
221
222         r.mutex.Lock()
223         defer r.mutex.Unlock()
224         subs.mutex.Lock()
225         defer subs.mutex.Unlock()
226
227         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
228         epamount := subs.EpList.Size()
229         subId := subs.ReqId.InstanceId
230
231         if delStatus == false {
232                 return nil
233         }
234
235         go func() {
236                 if waitRouteClean > 0 {
237                         time.Sleep(waitRouteClean)
238                 }
239
240                 subs.mutex.Lock()
241                 defer subs.mutex.Unlock()
242                 xapp.Logger.Info("CLEAN %s", subs.String())
243
244                 if epamount == 0 {
245                         //
246                         // Subscription route delete
247                         //
248                         tmpList := xapptweaks.RmrEndpointList{}
249                         tmpList.AddEndpoint(trans.GetEndpoint())
250                         subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
251                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
252
253                         //
254                         // Subscription release
255                         //
256                         r.mutex.Lock()
257                         defer r.mutex.Unlock()
258
259                         if _, ok := r.register[subId]; ok {
260                                 xapp.Logger.Debug("RELEASE %s", subs.String())
261                                 delete(r.register, subId)
262                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
263                         }
264                         r.subIds = append(r.subIds, subId)
265
266                 } else if subs.EpList.Size() > 0 {
267                         //
268                         // Subscription route updates
269                         //
270                         subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
271                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
272                 }
273
274         }()
275
276         return nil
277 }
278
279 func (r *Registry) GetSubscription(subId uint32) *Subscription {
280         r.mutex.Lock()
281         defer r.mutex.Unlock()
282         if _, ok := r.register[subId]; ok {
283                 return r.register[subId]
284         }
285         return nil
286 }
287
288 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
289         r.mutex.Lock()
290         defer r.mutex.Unlock()
291         for _, subId := range subIds {
292                 if _, ok := r.register[subId]; ok {
293                         return r.register[subId], nil
294                 }
295         }
296         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
297 }