2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 //-----------------------------------------------------------------------------
32 //-----------------------------------------------------------------------------
34 type Registry struct {
36 register map[uint32]*Subscription
38 rtmgrClient *RtmgrClient
41 func (r *Registry) Initialize() {
42 r.register = make(map[uint32]*Subscription)
44 for i = 0; i < 65535; i++ {
45 r.subIds = append(r.subIds, i+1)
49 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
50 if len(r.subIds) > 0 {
51 sequenceNumber := r.subIds[0]
52 r.subIds = r.subIds[1:]
53 if _, ok := r.register[sequenceNumber]; ok == true {
54 r.subIds = append(r.subIds, sequenceNumber)
55 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
57 subs := &Subscription{
64 subs.ReqId.Seq = sequenceNumber
66 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
67 r.subIds = append(r.subIds, subs.ReqId.Seq)
68 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
73 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
76 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
78 for _, subs := range r.register {
79 if subs.IsMergeable(trans, subReqMsg) {
82 // check if there has been race conditions
85 //subs has been set to invalid
86 if subs.valid == false {
90 // If size is zero, entry is to be deleted
91 if subs.EpList.Size() == 0 {
95 // try to add to endpointlist.
96 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
102 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
109 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
113 defer r.mutex.Unlock()
115 subs := r.findExistingSubs(trans, subReqMsg)
118 subs, err = r.allocateSubs(trans, subReqMsg)
126 // Add to subscription
129 defer subs.mutex.Unlock()
131 epamount := subs.EpList.Size()
135 // Subscription route updates
138 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
139 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
141 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
142 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
148 r.subIds = append(r.subIds, subs.ReqId.Seq)
154 r.register[subs.ReqId.Seq] = subs
156 xapp.Logger.Debug("CREATE %s", subs.String())
157 xapp.Logger.Debug("Registry: substable=%v", r.register)
161 // TODO: Works with concurrent calls, but check if can be improved
162 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
165 defer r.mutex.Unlock()
167 defer subs.mutex.Unlock()
169 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
170 epamount := subs.EpList.Size()
171 seqId := subs.ReqId.Seq
173 if delStatus == false {
180 // Wait some time before really do route updates
182 if waitRouteClean > 0 {
184 time.Sleep(waitRouteClean)
188 xapp.Logger.Info("CLEAN %s", subs.String())
191 // Subscription route updates
194 tmpList := RmrEndpointList{}
195 tmpList.AddEndpoint(trans.GetEndpoint())
196 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
197 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
198 } else if subs.EpList.Size() > 0 {
199 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
200 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
205 // If last endpoint, release and free seqid
208 if _, ok := r.register[seqId]; ok {
209 xapp.Logger.Debug("RELEASE %s", subs.String())
210 delete(r.register, seqId)
211 xapp.Logger.Debug("Registry: substable=%v", r.register)
213 r.subIds = append(r.subIds, seqId)
219 func (r *Registry) GetSubscription(sn uint32) *Subscription {
221 defer r.mutex.Unlock()
222 if _, ok := r.register[sn]; ok {
223 return r.register[sn]
228 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
230 defer r.mutex.Unlock()
231 for _, id := range ids {
232 if _, ok := r.register[id]; ok {
233 return r.register[id], nil
236 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)