Support for subscription action type Policy
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
26         "sync"
27         "time"
28 )
29
30 //-----------------------------------------------------------------------------
31 //
32 //-----------------------------------------------------------------------------
33
34 type Registry struct {
35         mutex       sync.Mutex
36         register    map[uint32]*Subscription
37         subIds      []uint32
38         rtmgrClient *RtmgrClient
39 }
40
41 func (r *Registry) Initialize() {
42         r.register = make(map[uint32]*Subscription)
43         var i uint32
44         for i = 0; i < 65535; i++ {
45                 r.subIds = append(r.subIds, i+1)
46         }
47 }
48
49 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
50         if len(r.subIds) > 0 {
51                 sequenceNumber := r.subIds[0]
52                 r.subIds = r.subIds[1:]
53                 if _, ok := r.register[sequenceNumber]; ok == true {
54                         r.subIds = append(r.subIds, sequenceNumber)
55                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
56                 }
57                 subs := &Subscription{
58                         registry:  r,
59                         Meid:      trans.Meid,
60                         SubReqMsg: subReqMsg,
61                         valid:     true,
62                 }
63                 subs.ReqId.Id = 123
64                 subs.ReqId.Seq = sequenceNumber
65
66                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
67                         r.subIds = append(r.subIds, subs.ReqId.Seq)
68                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
69                 }
70
71                 return subs, nil
72         }
73         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
74 }
75
76 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
77
78         for _, subs := range r.register {
79                 if subs.IsMergeable(trans, subReqMsg) {
80
81                         //
82                         // check if there has been race conditions
83                         //
84                         subs.mutex.Lock()
85                         //subs has been set to invalid
86                         if subs.valid == false {
87                                 subs.mutex.Unlock()
88                                 continue
89                         }
90                         // If size is zero, entry is to be deleted
91                         if subs.EpList.Size() == 0 {
92                                 subs.mutex.Unlock()
93                                 continue
94                         }
95                         // try to add to endpointlist.
96                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
97                                 subs.mutex.Unlock()
98                                 continue
99                         }
100                         subs.mutex.Unlock()
101
102                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
103                         return subs
104                 }
105         }
106         return nil
107 }
108
109 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
110         var err error
111         var newAlloc bool
112         r.mutex.Lock()
113         defer r.mutex.Unlock()
114
115         //
116         // Check validity of subscription action types
117         //
118         actionType, err := r.CheckActionTypes(subReqMsg)
119         if err != nil {
120                 xapp.Logger.Debug("CREATE %s", err)
121                 return nil, err
122         }
123
124         //
125         // Find possible existing Policy subscription
126         //
127         if actionType == e2ap.E2AP_ActionTypePolicy {
128                 if subs, ok := r.register[subReqMsg.RequestId.Seq]; ok {
129                         xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
130                         subs.SetCachedResponse(nil, true)
131                         return subs, nil
132                 }
133         }
134
135         subs := r.findExistingSubs(trans, subReqMsg)
136         if subs == nil {
137                 subs, err = r.allocateSubs(trans, subReqMsg)
138                 if err != nil {
139                         return nil, err
140                 }
141                 newAlloc = true
142         }
143
144         //
145         // Add to subscription
146         //
147         subs.mutex.Lock()
148         defer subs.mutex.Unlock()
149
150         epamount := subs.EpList.Size()
151
152         r.mutex.Unlock()
153         //
154         // Subscription route updates
155         //
156         if epamount == 1 {
157                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
158                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
159         } else {
160                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
161                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
162         }
163         r.mutex.Lock()
164
165         if err != nil {
166                 if newAlloc {
167                         r.subIds = append(r.subIds, subs.ReqId.Seq)
168                 }
169                 return nil, err
170         }
171
172         if newAlloc {
173                 r.register[subs.ReqId.Seq] = subs
174         }
175         xapp.Logger.Debug("CREATE %s", subs.String())
176         xapp.Logger.Debug("Registry: substable=%v", r.register)
177         return subs, nil
178 }
179
180 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
181         var reportFound bool = false
182         var policyFound bool = false
183
184         for _, acts := range subReqMsg.ActionSetups {
185                 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
186                         reportFound = true
187                 }
188                 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
189                         policyFound = true
190                 }
191         }
192         if reportFound == true && policyFound == true {
193                 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
194         }
195         if reportFound == true {
196                 return e2ap.E2AP_ActionTypeReport, nil
197         }
198         if policyFound == true {
199                 return e2ap.E2AP_ActionTypePolicy, nil
200         }
201         return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
202 }
203
204 // TODO: Works with concurrent calls, but check if can be improved
205 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
206
207         r.mutex.Lock()
208         defer r.mutex.Unlock()
209         subs.mutex.Lock()
210         defer subs.mutex.Unlock()
211
212         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
213         epamount := subs.EpList.Size()
214         seqId := subs.ReqId.Seq
215
216         if delStatus == false {
217                 return nil
218         }
219
220         r.mutex.Unlock()
221
222         //
223         // Wait some time before really do route updates
224         //
225         if waitRouteClean > 0 {
226                 subs.mutex.Unlock()
227                 time.Sleep(waitRouteClean)
228                 subs.mutex.Lock()
229         }
230
231         xapp.Logger.Info("CLEAN %s", subs.String())
232
233         //
234         // Subscription route updates
235         //
236         if epamount == 0 {
237                 tmpList := RmrEndpointList{}
238                 tmpList.AddEndpoint(trans.GetEndpoint())
239                 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
240                 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
241         } else if subs.EpList.Size() > 0 {
242                 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
243                 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
244         }
245
246         r.mutex.Lock()
247         //
248         // If last endpoint, release and free seqid
249         //
250         if epamount == 0 {
251                 if _, ok := r.register[seqId]; ok {
252                         xapp.Logger.Debug("RELEASE %s", subs.String())
253                         delete(r.register, seqId)
254                         xapp.Logger.Debug("Registry: substable=%v", r.register)
255                 }
256                 r.subIds = append(r.subIds, seqId)
257         }
258
259         return nil
260 }
261
262 func (r *Registry) GetSubscription(sn uint32) *Subscription {
263         r.mutex.Lock()
264         defer r.mutex.Unlock()
265         if _, ok := r.register[sn]; ok {
266                 return r.register[sn]
267         }
268         return nil
269 }
270
271 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
272         r.mutex.Lock()
273         defer r.mutex.Unlock()
274         for _, id := range ids {
275                 if _, ok := r.register[id]; ok {
276                         return r.register[id], nil
277                 }
278         }
279         return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
280 }