396daded93335e834879b7442a2f6a0f0786e5d5
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         "sync"
29         "time"
30 )
31
32 //-----------------------------------------------------------------------------
33 //
34 //-----------------------------------------------------------------------------
35
36 type Registry struct {
37         mutex       sync.Mutex
38         register    map[uint32]*Subscription
39         subIds      []uint32
40         rtmgrClient *RtmgrClient
41 }
42
43 func (r *Registry) Initialize() {
44         r.register = make(map[uint32]*Subscription)
45         var i uint32
46         for i = 0; i < 65535; i++ {
47                 r.subIds = append(r.subIds, i+1)
48         }
49 }
50
51 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52         r.mutex.Lock()
53         defer r.mutex.Unlock()
54
55         resp := models.SubscriptionList{}
56         for _, subs := range r.register {
57                 subs.mutex.Lock()
58                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
59                 subs.mutex.Unlock()
60         }
61         return resp, nil
62 }
63
64 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
65         if len(r.subIds) > 0 {
66                 subId := r.subIds[0]
67                 r.subIds = r.subIds[1:]
68                 if _, ok := r.register[subId]; ok == true {
69                         r.subIds = append(r.subIds, subId)
70                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71                 }
72                 subs := &Subscription{
73                         registry:  r,
74                         Meid:      trans.Meid,
75                         SubReqMsg: subReqMsg,
76                         valid:     true,
77                 }
78                 subs.ReqId.Id = 123
79                 subs.ReqId.InstanceId = subId
80
81                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
82                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
83                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
84                 }
85
86                 return subs, nil
87         }
88         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
89 }
90
91 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
92
93         for _, subs := range r.register {
94                 if subs.IsMergeable(trans, subReqMsg) {
95
96                         //
97                         // check if there has been race conditions
98                         //
99                         subs.mutex.Lock()
100                         //subs has been set to invalid
101                         if subs.valid == false {
102                                 subs.mutex.Unlock()
103                                 continue
104                         }
105                         // If size is zero, entry is to be deleted
106                         if subs.EpList.Size() == 0 {
107                                 subs.mutex.Unlock()
108                                 continue
109                         }
110                         // try to add to endpointlist.
111                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
112                                 subs.mutex.Unlock()
113                                 continue
114                         }
115                         subs.mutex.Unlock()
116
117                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
118                         return subs
119                 }
120         }
121         return nil
122 }
123
124 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
125         var err error
126         var newAlloc bool
127         r.mutex.Lock()
128         defer r.mutex.Unlock()
129
130         //
131         // Check validity of subscription action types
132         //
133         actionType, err := r.CheckActionTypes(subReqMsg)
134         if err != nil {
135                 xapp.Logger.Debug("CREATE %s", err)
136                 return nil, err
137         }
138
139         //
140         // Find possible existing Policy subscription
141         //
142         if actionType == e2ap.E2AP_ActionTypePolicy {
143                 if subs, ok := r.register[trans.GetSubId()]; ok {
144                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
145                         // Update message data to subscription
146                         subs.SubReqMsg = subReqMsg
147                         subs.SetCachedResponse(nil, true)
148                         return subs, nil
149                 }
150         }
151
152         subs := r.findExistingSubs(trans, subReqMsg)
153         if subs == nil {
154                 subs, err = r.allocateSubs(trans, subReqMsg)
155                 if err != nil {
156                         return nil, err
157                 }
158                 newAlloc = true
159         }
160
161         //
162         // Add to subscription
163         //
164         subs.mutex.Lock()
165         defer subs.mutex.Unlock()
166
167         epamount := subs.EpList.Size()
168
169         r.mutex.Unlock()
170         //
171         // Subscription route updates
172         //
173         if epamount == 1 {
174                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
175                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
176         } else {
177                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
178                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
179         }
180         r.mutex.Lock()
181
182         if err != nil {
183                 if newAlloc {
184                         r.subIds = append(r.subIds, subs.ReqId.InstanceId)
185                 }
186                 return nil, err
187         }
188
189         if newAlloc {
190                 r.register[subs.ReqId.InstanceId] = subs
191         }
192         xapp.Logger.Debug("CREATE %s", subs.String())
193         xapp.Logger.Debug("Registry: substable=%v", r.register)
194         return subs, nil
195 }
196
197 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
198         var reportFound bool = false
199         var policyFound bool = false
200
201         for _, acts := range subReqMsg.ActionSetups {
202                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
203                         reportFound = true
204                 }
205                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
206                         policyFound = true
207                 }
208         }
209         if reportFound == true && policyFound == true {
210                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
211         }
212         if reportFound == true {
213                 return e2ap.E2AP_ActionTypeReport, nil
214         }
215         if policyFound == true {
216                 return e2ap.E2AP_ActionTypePolicy, nil
217         }
218         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
219 }
220
221 // TODO: Works with concurrent calls, but check if can be improved
222 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
223
224         r.mutex.Lock()
225         defer r.mutex.Unlock()
226         subs.mutex.Lock()
227         defer subs.mutex.Unlock()
228
229         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
230         epamount := subs.EpList.Size()
231         subId := subs.ReqId.InstanceId
232
233         if delStatus == false {
234                 return nil
235         }
236
237         go func() {
238                 if waitRouteClean > 0 {
239                         time.Sleep(waitRouteClean)
240                 }
241
242                 subs.mutex.Lock()
243                 defer subs.mutex.Unlock()
244                 xapp.Logger.Info("CLEAN %s", subs.String())
245
246                 if epamount == 0 {
247                         //
248                         // Subscription route delete
249                         //
250                         tmpList := xapptweaks.RmrEndpointList{}
251                         tmpList.AddEndpoint(trans.GetEndpoint())
252                         subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
253                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
254
255                         //
256                         // Subscription release
257                         //
258                         r.mutex.Lock()
259                         defer r.mutex.Unlock()
260
261                         if _, ok := r.register[subId]; ok {
262                                 xapp.Logger.Debug("RELEASE %s", subs.String())
263                                 delete(r.register, subId)
264                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
265                         }
266                         r.subIds = append(r.subIds, subId)
267
268                 } else if subs.EpList.Size() > 0 {
269                         //
270                         // Subscription route updates
271                         //
272                         subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
273                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
274                 }
275
276         }()
277
278         return nil
279 }
280
281 func (r *Registry) GetSubscription(subId uint32) *Subscription {
282         r.mutex.Lock()
283         defer r.mutex.Unlock()
284         if _, ok := r.register[subId]; ok {
285                 return r.register[subId]
286         }
287         return nil
288 }
289
290 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
291         r.mutex.Lock()
292         defer r.mutex.Unlock()
293         for _, subId := range subIds {
294                 if _, ok := r.register[subId]; ok {
295                         return r.register[subId], nil
296                 }
297         }
298         return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
299 }