RICPLT-3015 Reusing subscription if same EventTrigger and Action
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
26         "sync"
27         "time"
28 )
29
30 //-----------------------------------------------------------------------------
31 //
32 //-----------------------------------------------------------------------------
33 type Registry struct {
34         mutex       sync.Mutex
35         register    map[uint16]*Subscription
36         subIds      []uint16
37         rtmgrClient *RtmgrClient
38 }
39
40 func (r *Registry) Initialize() {
41         r.register = make(map[uint16]*Subscription)
42         var i uint16
43         for i = 0; i < 65535; i++ {
44                 r.subIds = append(r.subIds, i+1)
45         }
46 }
47
48 func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
49         if len(r.subIds) > 0 {
50                 sequenceNumber := r.subIds[0]
51                 r.subIds = r.subIds[1:]
52                 if _, ok := r.register[sequenceNumber]; ok == true {
53                         r.subIds = append(r.subIds, sequenceNumber)
54                         return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
55                 }
56                 subs := &Subscription{
57                         registry:  r,
58                         Seq:       sequenceNumber,
59                         Meid:      trans.Meid,
60                         SubReqMsg: subReqMsg,
61                         valid:     true,
62                 }
63
64                 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
65                         r.subIds = append(r.subIds, subs.Seq)
66                         return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
67                 }
68
69                 return subs, nil
70         }
71         return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
72 }
73
74 func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
75         for _, subs := range r.register {
76                 if subs.IsSame(trans, subReqMsg) {
77
78                         //
79                         // check if there has been race conditions
80                         //
81                         subs.mutex.Lock()
82                         //subs has been set to invalid
83                         if subs.valid == false {
84                                 subs.mutex.Unlock()
85                                 continue
86                         }
87                         // try to add to endpointlist.
88                         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
89                                 subs.mutex.Unlock()
90                                 continue
91                         }
92                         subs.mutex.Unlock()
93
94                         //Race collision during parallel incoming and deleted
95                         xapp.Logger.Debug("Registry: Identical subs found %s for %s", subs.String(), trans.String())
96                         return subs
97                 }
98         }
99         return nil
100 }
101
102 func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
103         var err error
104         var newAlloc bool
105         r.mutex.Lock()
106         defer r.mutex.Unlock()
107
108         subs := r.findExistingSubs(trans, subReqMsg)
109
110         if subs == nil {
111                 subs, err = r.allocateSubs(trans, subReqMsg)
112                 if err != nil {
113                         return nil, err
114                 }
115                 newAlloc = true
116         }
117
118         //
119         // Add to subscription
120         //
121         subs.mutex.Lock()
122         defer subs.mutex.Unlock()
123
124         epamount := subs.EpList.Size()
125
126         r.mutex.Unlock()
127         //
128         // Subscription route updates
129         //
130         if epamount == 1 {
131                 subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
132                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
133         } else {
134                 subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
135                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
136         }
137         r.mutex.Lock()
138
139         if err != nil {
140                 if newAlloc {
141                         r.subIds = append(r.subIds, subs.Seq)
142                 }
143                 return nil, err
144         }
145
146         if newAlloc {
147                 r.register[subs.Seq] = subs
148         }
149         xapp.Logger.Debug("Registry: Create %s", subs.String())
150         xapp.Logger.Debug("Registry: substable=%v", r.register)
151         return subs, nil
152 }
153
154 // TODO: Needs better logic when there is concurrent calls
155 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
156
157         r.mutex.Lock()
158         defer r.mutex.Unlock()
159         subs.mutex.Lock()
160         defer subs.mutex.Unlock()
161
162         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
163         epamount := subs.EpList.Size()
164
165         //
166         // If last endpoint remove from register map
167         //
168         if epamount == 0 {
169                 if _, ok := r.register[subs.Seq]; ok {
170                         xapp.Logger.Debug("Registry: Delete %s", subs.String())
171                         delete(r.register, subs.Seq)
172                         xapp.Logger.Debug("Registry: substable=%v", r.register)
173                 }
174         }
175         r.mutex.Unlock()
176
177         //
178         // Wait some time before really do route updates
179         //
180         if waitRouteClean > 0 {
181                 subs.mutex.Unlock()
182                 time.Sleep(waitRouteClean)
183                 subs.mutex.Lock()
184         }
185
186         xapp.Logger.Info("Registry: Cleaning %s", subs.String())
187
188         //
189         // Subscription route updates
190         //
191         if delStatus {
192                 if epamount == 0 {
193                         tmpList := RmrEndpointList{}
194                         tmpList.AddEndpoint(trans.GetEndpoint())
195                         subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
196                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
197                 } else {
198                         subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
199                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
200                 }
201         }
202
203         r.mutex.Lock()
204         //
205         // If last endpoint free seq nro
206         //
207         if epamount == 0 {
208                 r.subIds = append(r.subIds, subs.Seq)
209         }
210
211         return nil
212 }
213
214 func (r *Registry) GetSubscription(sn uint16) *Subscription {
215         r.mutex.Lock()
216         defer r.mutex.Unlock()
217         if _, ok := r.register[sn]; ok {
218                 return r.register[sn]
219         }
220         return nil
221 }
222
223 func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
224         r.mutex.Lock()
225         defer r.mutex.Unlock()
226         for _, id := range ids {
227                 if _, ok := r.register[id]; ok {
228                         return r.register[id], nil
229                 }
230         }
231         return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
232 }