RIC-79 intial implementation to fetch subscriptions via rest
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34
35 type Registry struct {
36         mutex       sync.Mutex
37         register    map[uint32]*Subscription
38         subIds      []uint32
39         rtmgrClient *RtmgrClient
40 }
41
42 func (r *Registry) Initialize() {
43         r.register = make(map[uint32]*Subscription)
44         var i uint32
45         for i = 0; i < 65535; i++ {
46                 r.subIds = append(r.subIds, i+1)
47         }
48 }
49
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
51         r.mutex.Lock()
52         defer r.mutex.Unlock()
53
54         resp := models.SubscriptionList{}
55         for _, subs := range r.register {
56                 subs.mutex.Lock()
57                 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.Seq), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
58                 subs.mutex.Unlock()
59         }
60         return resp, nil
61 }
62
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64         if len(r.subIds) > 0 {
65                 sequenceNumber := r.subIds[0]
66                 r.subIds = r.subIds[1:]
67                 if _, ok := r.register[sequenceNumber]; ok == true {
68                         r.subIds = append(r.subIds, sequenceNumber)
69                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
70                 }
71                 subs := &Subscription{
72                         registry:  r,
73                         Meid:      trans.Meid,
74                         SubReqMsg: subReqMsg,
75                         valid:     true,
76                 }
77                 subs.ReqId.Id = 123
78                 subs.ReqId.Seq = sequenceNumber
79
80                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81                         r.subIds = append(r.subIds, subs.ReqId.Seq)
82                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
83                 }
84
85                 return subs, nil
86         }
87         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
88 }
89
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
91
92         for _, subs := range r.register {
93                 if subs.IsMergeable(trans, subReqMsg) {
94
95                         //
96                         // check if there has been race conditions
97                         //
98                         subs.mutex.Lock()
99                         //subs has been set to invalid
100                         if subs.valid == false {
101                                 subs.mutex.Unlock()
102                                 continue
103                         }
104                         // If size is zero, entry is to be deleted
105                         if subs.EpList.Size() == 0 {
106                                 subs.mutex.Unlock()
107                                 continue
108                         }
109                         // try to add to endpointlist.
110                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
111                                 subs.mutex.Unlock()
112                                 continue
113                         }
114                         subs.mutex.Unlock()
115
116                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
117                         return subs
118                 }
119         }
120         return nil
121 }
122
123 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
124         var err error
125         var newAlloc bool
126         r.mutex.Lock()
127         defer r.mutex.Unlock()
128
129         //
130         // Check validity of subscription action types
131         //
132         actionType, err := r.CheckActionTypes(subReqMsg)
133         if err != nil {
134                 xapp.Logger.Debug("CREATE %s", err)
135                 return nil, err
136         }
137
138         //
139         // Find possible existing Policy subscription
140         //
141         if actionType == e2ap.E2AP_ActionTypePolicy {
142                 if subs, ok := r.register[trans.GetSubId()]; ok {
143                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
144                         subs.SetCachedResponse(nil, true)
145                         return subs, nil
146                 }
147         }
148
149         subs := r.findExistingSubs(trans, subReqMsg)
150         if subs == nil {
151                 subs, err = r.allocateSubs(trans, subReqMsg)
152                 if err != nil {
153                         return nil, err
154                 }
155                 newAlloc = true
156         }
157
158         //
159         // Add to subscription
160         //
161         subs.mutex.Lock()
162         defer subs.mutex.Unlock()
163
164         epamount := subs.EpList.Size()
165
166         r.mutex.Unlock()
167         //
168         // Subscription route updates
169         //
170         if epamount == 1 {
171                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
172                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
173         } else {
174                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
175                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
176         }
177         r.mutex.Lock()
178
179         if err != nil {
180                 if newAlloc {
181                         r.subIds = append(r.subIds, subs.ReqId.Seq)
182                 }
183                 return nil, err
184         }
185
186         if newAlloc {
187                 r.register[subs.ReqId.Seq] = subs
188         }
189         xapp.Logger.Debug("CREATE %s", subs.String())
190         xapp.Logger.Debug("Registry: substable=%v", r.register)
191         return subs, nil
192 }
193
194 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
195         var reportFound bool = false
196         var policyFound bool = false
197
198         for _, acts := range subReqMsg.ActionSetups {
199                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
200                         reportFound = true
201                 }
202                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
203                         policyFound = true
204                 }
205         }
206         if reportFound == true && policyFound == true {
207                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
208         }
209         if reportFound == true {
210                 return e2ap.E2AP_ActionTypeReport, nil
211         }
212         if policyFound == true {
213                 return e2ap.E2AP_ActionTypePolicy, nil
214         }
215         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
216 }
217
218 // TODO: Works with concurrent calls, but check if can be improved
219 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
220
221         r.mutex.Lock()
222         defer r.mutex.Unlock()
223         subs.mutex.Lock()
224         defer subs.mutex.Unlock()
225
226         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
227         epamount := subs.EpList.Size()
228         seqId := subs.ReqId.Seq
229
230         if delStatus == false {
231                 return nil
232         }
233
234         go func() {
235                 if waitRouteClean > 0 {
236                         time.Sleep(waitRouteClean)
237                 }
238
239                 subs.mutex.Lock()
240                 defer subs.mutex.Unlock()
241                 xapp.Logger.Info("CLEAN %s", subs.String())
242
243                 if epamount == 0 {
244                         //
245                         // Subscription route delete
246                         //
247                         tmpList := RmrEndpointList{}
248                         tmpList.AddEndpoint(trans.GetEndpoint())
249                         subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
250                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
251
252                         //
253                         // Subscription release
254                         //
255                         r.mutex.Lock()
256                         defer r.mutex.Unlock()
257
258                         if _, ok := r.register[seqId]; ok {
259                                 xapp.Logger.Debug("RELEASE %s", subs.String())
260                                 delete(r.register, seqId)
261                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
262                         }
263                         r.subIds = append(r.subIds, seqId)
264
265                 } else if subs.EpList.Size() > 0 {
266                         //
267                         // Subscription route updates
268                         //
269                         subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
270                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
271                 }
272
273         }()
274
275         return nil
276 }
277
278 func (r *Registry) GetSubscription(sn uint32) *Subscription {
279         r.mutex.Lock()
280         defer r.mutex.Unlock()
281         if _, ok := r.register[sn]; ok {
282                 return r.register[sn]
283         }
284         return nil
285 }
286
287 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
288         r.mutex.Lock()
289         defer r.mutex.Unlock()
290         for _, id := range ids {
291                 if _, ok := r.register[id]; ok {
292                         return r.register[id], nil
293                 }
294         }
295         return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
296 }