e00062b7b4ea9140b7516ee90aa023ce4bc17fda
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
26         "sync"
27         "time"
28 )
29
30 //-----------------------------------------------------------------------------
31 //
32 //-----------------------------------------------------------------------------
33
34 type Registry struct {
35         mutex       sync.Mutex
36         register    map[uint32]*Subscription
37         subIds      []uint32
38         rtmgrClient *RtmgrClient
39 }
40
41 func (r *Registry) Initialize() {
42         r.register = make(map[uint32]*Subscription)
43         var i uint32
44         for i = 0; i < 65535; i++ {
45                 r.subIds = append(r.subIds, i+1)
46         }
47 }
48
49 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
50         if len(r.subIds) > 0 {
51                 sequenceNumber := r.subIds[0]
52                 r.subIds = r.subIds[1:]
53                 if _, ok := r.register[sequenceNumber]; ok == true {
54                         r.subIds = append(r.subIds, sequenceNumber)
55                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
56                 }
57                 subs := &Subscription{
58                         registry:  r,
59                         Meid:      trans.Meid,
60                         SubReqMsg: subReqMsg,
61                         valid:     true,
62                 }
63                 subs.ReqId.Id = 123
64                 subs.ReqId.Seq = sequenceNumber
65
66                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
67                         r.subIds = append(r.subIds, subs.ReqId.Seq)
68                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
69                 }
70
71                 return subs, nil
72         }
73         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
74 }
75
76 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
77
78         for _, subs := range r.register {
79                 if subs.IsMergeable(trans, subReqMsg) {
80
81                         //
82                         // check if there has been race conditions
83                         //
84                         subs.mutex.Lock()
85                         //subs has been set to invalid
86                         if subs.valid == false {
87                                 subs.mutex.Unlock()
88                                 continue
89                         }
90                         // If size is zero, entry is to be deleted
91                         if subs.EpList.Size() == 0 {
92                                 subs.mutex.Unlock()
93                                 continue
94                         }
95                         // try to add to endpointlist.
96                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
97                                 subs.mutex.Unlock()
98                                 continue
99                         }
100                         subs.mutex.Unlock()
101
102                         xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
103                         return subs
104                 }
105         }
106         return nil
107 }
108
109 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
110         var err error
111         var newAlloc bool
112         r.mutex.Lock()
113         defer r.mutex.Unlock()
114
115         subs := r.findExistingSubs(trans, subReqMsg)
116
117         if subs == nil {
118                 subs, err = r.allocateSubs(trans, subReqMsg)
119                 if err != nil {
120                         return nil, err
121                 }
122                 newAlloc = true
123         }
124
125         //
126         // Add to subscription
127         //
128         subs.mutex.Lock()
129         defer subs.mutex.Unlock()
130
131         epamount := subs.EpList.Size()
132
133         r.mutex.Unlock()
134         //
135         // Subscription route updates
136         //
137         if epamount == 1 {
138                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
139                 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
140         } else {
141                 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
142                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
143         }
144         r.mutex.Lock()
145
146         if err != nil {
147                 if newAlloc {
148                         r.subIds = append(r.subIds, subs.ReqId.Seq)
149                 }
150                 return nil, err
151         }
152
153         if newAlloc {
154                 r.register[subs.ReqId.Seq] = subs
155         }
156         xapp.Logger.Debug("CREATE %s", subs.String())
157         xapp.Logger.Debug("Registry: substable=%v", r.register)
158         return subs, nil
159 }
160
161 // TODO: Works with concurrent calls, but check if can be improved
162 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
163
164         r.mutex.Lock()
165         defer r.mutex.Unlock()
166         subs.mutex.Lock()
167         defer subs.mutex.Unlock()
168
169         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
170         epamount := subs.EpList.Size()
171         seqId := subs.ReqId.Seq
172
173         if delStatus == false {
174                 return nil
175         }
176
177         r.mutex.Unlock()
178
179         //
180         // Wait some time before really do route updates
181         //
182         if waitRouteClean > 0 {
183                 subs.mutex.Unlock()
184                 time.Sleep(waitRouteClean)
185                 subs.mutex.Lock()
186         }
187
188         xapp.Logger.Info("CLEAN %s", subs.String())
189
190         //
191         // Subscription route updates
192         //
193         if epamount == 0 {
194                 tmpList := RmrEndpointList{}
195                 tmpList.AddEndpoint(trans.GetEndpoint())
196                 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
197                 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
198         } else if subs.EpList.Size() > 0 {
199                 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
200                 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
201         }
202
203         r.mutex.Lock()
204         //
205         // If last endpoint, release and free seqid
206         //
207         if epamount == 0 {
208                 if _, ok := r.register[seqId]; ok {
209                         xapp.Logger.Debug("RELEASE %s", subs.String())
210                         delete(r.register, seqId)
211                         xapp.Logger.Debug("Registry: substable=%v", r.register)
212                 }
213                 r.subIds = append(r.subIds, seqId)
214         }
215
216         return nil
217 }
218
219 func (r *Registry) GetSubscription(sn uint32) *Subscription {
220         r.mutex.Lock()
221         defer r.mutex.Unlock()
222         if _, ok := r.register[sn]; ok {
223                 return r.register[sn]
224         }
225         return nil
226 }
227
228 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
229         r.mutex.Lock()
230         defer r.mutex.Unlock()
231         for _, id := range ids {
232                 if _, ok := r.register[id]; ok {
233                         return r.register[id], nil
234                 }
235         }
236         return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
237 }