f24dccdded23b9970abe238b272e89a61ee602e9
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
26         "sync"
27         "time"
28 )
29
30 //-----------------------------------------------------------------------------
31 //
32 //-----------------------------------------------------------------------------
33
34 type Registry struct {
35         mutex       sync.Mutex
36         register    map[uint32]*Subscription
37         subIds      []uint32
38         rtmgrClient *RtmgrClient
39 }
40
41 func (r *Registry) Initialize() {
42         r.register = make(map[uint32]*Subscription)
43         var i uint32
44         for i = 0; i < 65535; i++ {
45                 r.subIds = append(r.subIds, i+1)
46         }
47 }
48
49 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
50         if len(r.subIds) > 0 {
51                 sequenceNumber := r.subIds[0]
52                 r.subIds = r.subIds[1:]
53                 if _, ok := r.register[sequenceNumber]; ok == true {
54                         r.subIds = append(r.subIds, sequenceNumber)
55                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
56                 }
57                 subs := &Subscription{
58                         registry:  r,
59                         Meid:      trans.Meid,
60                         SubReqMsg: subReqMsg,
61                         valid:     true,
62                 }
63                 subs.ReqId.Id = 123
64                 subs.ReqId.Seq = sequenceNumber
65
66                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
67                         r.subIds = append(r.subIds, subs.ReqId.Seq)
68                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
69                 }
70
71                 return subs, nil
72         }
73         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
74 }
75
76 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
77
78         for _, subs := range r.register {
79                 if subs.IsMergeable(trans, subReqMsg) {
80
81                         //
82                         // check if there has been race conditions
83                         //
84                         subs.mutex.Lock()
85                         //subs has been set to invalid
86                         if subs.valid == false {
87                                 subs.mutex.Unlock()
88                                 continue
89                         }
90                         // If size is zero, entry is to be deleted
91                         if subs.EpList.Size() == 0 {
92                                 subs.mutex.Unlock()
93                                 continue
94                         }
95                         // try to add to endpointlist.
96                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
97                                 subs.mutex.Unlock()
98                                 continue
99                         }
100                         subs.mutex.Unlock()
101
102                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
103                         return subs
104                 }
105         }
106         return nil
107 }
108
109 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
110         var err error
111         var newAlloc bool
112         r.mutex.Lock()
113         defer r.mutex.Unlock()
114
115         //
116         // Check validity of subscription action types
117         //
118         actionType, err := r.CheckActionTypes(subReqMsg)
119         if err != nil {
120                 xapp.Logger.Debug("CREATE %s", err)
121                 return nil, err
122         }
123
124         //
125         // Find possible existing Policy subscription
126         //
127         if actionType == e2ap.E2AP_ActionTypePolicy {
128                 if subs, ok := r.register[trans.GetSubId()]; ok {
129                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
130                         subs.SetCachedResponse(nil, true)
131                         return subs, nil
132                 }
133         }
134
135         subs := r.findExistingSubs(trans, subReqMsg)
136         if subs == nil {
137                 subs, err = r.allocateSubs(trans, subReqMsg)
138                 if err != nil {
139                         return nil, err
140                 }
141                 newAlloc = true
142         }
143
144         //
145         // Add to subscription
146         //
147         subs.mutex.Lock()
148         defer subs.mutex.Unlock()
149
150         epamount := subs.EpList.Size()
151
152         r.mutex.Unlock()
153         //
154         // Subscription route updates
155         //
156         if epamount == 1 {
157                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
158                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
159         } else {
160                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
161                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
162         }
163         r.mutex.Lock()
164
165         if err != nil {
166                 if newAlloc {
167                         r.subIds = append(r.subIds, subs.ReqId.Seq)
168                 }
169                 return nil, err
170         }
171
172         if newAlloc {
173                 r.register[subs.ReqId.Seq] = subs
174         }
175         xapp.Logger.Debug("CREATE %s", subs.String())
176         xapp.Logger.Debug("Registry: substable=%v", r.register)
177         return subs, nil
178 }
179
180 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
181         var reportFound bool = false
182         var policyFound bool = false
183
184         for _, acts := range subReqMsg.ActionSetups {
185                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
186                         reportFound = true
187                 }
188                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
189                         policyFound = true
190                 }
191         }
192         if reportFound == true && policyFound == true {
193                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
194         }
195         if reportFound == true {
196                 return e2ap.E2AP_ActionTypeReport, nil
197         }
198         if policyFound == true {
199                 return e2ap.E2AP_ActionTypePolicy, nil
200         }
201         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
202 }
203
204 // TODO: Works with concurrent calls, but check if can be improved
205 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
206
207         r.mutex.Lock()
208         defer r.mutex.Unlock()
209         subs.mutex.Lock()
210         defer subs.mutex.Unlock()
211
212         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
213         epamount := subs.EpList.Size()
214         seqId := subs.ReqId.Seq
215
216         if delStatus == false {
217                 return nil
218         }
219
220         go func() {
221                 if waitRouteClean > 0 {
222                         time.Sleep(waitRouteClean)
223                 }
224
225                 subs.mutex.Lock()
226                 defer subs.mutex.Unlock()
227                 xapp.Logger.Info("CLEAN %s", subs.String())
228
229                 if epamount == 0 {
230                         //
231                         // Subscription route delete
232                         //
233                         tmpList := RmrEndpointList{}
234                         tmpList.AddEndpoint(trans.GetEndpoint())
235                         subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
236                         r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
237
238                         //
239                         // Subscription release
240                         //
241                         r.mutex.Lock()
242                         defer r.mutex.Unlock()
243
244                         if _, ok := r.register[seqId]; ok {
245                                 xapp.Logger.Debug("RELEASE %s", subs.String())
246                                 delete(r.register, seqId)
247                                 xapp.Logger.Debug("Registry: substable=%v", r.register)
248                         }
249                         r.subIds = append(r.subIds, seqId)
250
251                 } else if subs.EpList.Size() > 0 {
252                         //
253                         // Subscription route updates
254                         //
255                         subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
256                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
257                 }
258
259         }()
260
261         return nil
262 }
263
264 func (r *Registry) GetSubscription(sn uint32) *Subscription {
265         r.mutex.Lock()
266         defer r.mutex.Unlock()
267         if _, ok := r.register[sn]; ok {
268                 return r.register[sn]
269         }
270         return nil
271 }
272
273 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
274         r.mutex.Lock()
275         defer r.mutex.Unlock()
276         for _, id := range ids {
277                 if _, ok := r.register[id]; ok {
278                         return r.register[id], nil
279                 }
280         }
281         return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
282 }