2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 //-----------------------------------------------------------------------------
32 //-----------------------------------------------------------------------------
33 type Registry struct {
35 register map[uint16]*Subscription
37 rtmgrClient *RtmgrClient
40 func (r *Registry) Initialize() {
41 r.register = make(map[uint16]*Subscription)
43 for i = 0; i < 65535; i++ {
44 r.subIds = append(r.subIds, i+1)
48 func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
49 if len(r.subIds) > 0 {
50 sequenceNumber := r.subIds[0]
51 r.subIds = r.subIds[1:]
52 if _, ok := r.register[sequenceNumber]; ok == true {
53 r.subIds = append(r.subIds, sequenceNumber)
54 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
56 subs := &Subscription{
64 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
65 r.subIds = append(r.subIds, subs.Seq)
66 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
71 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
74 func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
75 for _, subs := range r.register {
76 if subs.IsSame(trans, subReqMsg) {
79 // check if there has been race conditions
82 //subs has been set to invalid
83 if subs.valid == false {
87 // try to add to endpointlist.
88 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
94 //Race collision during parallel incoming and deleted
95 xapp.Logger.Debug("Registry: Identical subs found %s for %s", subs.String(), trans.String())
102 func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
106 defer r.mutex.Unlock()
108 subs := r.findExistingSubs(trans, subReqMsg)
111 subs, err = r.allocateSubs(trans, subReqMsg)
119 // Add to subscription
122 defer subs.mutex.Unlock()
124 epamount := subs.EpList.Size()
128 // Subscription route updates
131 subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
132 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
134 subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
135 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
141 r.subIds = append(r.subIds, subs.Seq)
147 r.register[subs.Seq] = subs
149 xapp.Logger.Debug("Registry: Create %s", subs.String())
150 xapp.Logger.Debug("Registry: substable=%v", r.register)
154 // TODO: Needs better logic when there is concurrent calls
155 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
158 defer r.mutex.Unlock()
160 defer subs.mutex.Unlock()
162 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
163 epamount := subs.EpList.Size()
166 // If last endpoint remove from register map
169 if _, ok := r.register[subs.Seq]; ok {
170 xapp.Logger.Debug("Registry: Delete %s", subs.String())
171 delete(r.register, subs.Seq)
172 xapp.Logger.Debug("Registry: substable=%v", r.register)
178 // Wait some time before really do route updates
180 if waitRouteClean > 0 {
182 time.Sleep(waitRouteClean)
186 xapp.Logger.Info("Registry: Cleaning %s", subs.String())
189 // Subscription route updates
193 tmpList := RmrEndpointList{}
194 tmpList.AddEndpoint(trans.GetEndpoint())
195 subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
196 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
198 subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
199 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
205 // If last endpoint free seq nro
208 r.subIds = append(r.subIds, subs.Seq)
214 func (r *Registry) GetSubscription(sn uint16) *Subscription {
216 defer r.mutex.Unlock()
217 if _, ok := r.register[sn]; ok {
218 return r.register[sn]
223 func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
225 defer r.mutex.Unlock()
226 for _, id := range ids {
227 if _, ok := r.register[id]; ok {
228 return r.register[id], nil
231 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)