2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 //-----------------------------------------------------------------------------
32 //-----------------------------------------------------------------------------
34 type Registry struct {
36 register map[uint32]*Subscription
38 rtmgrClient *RtmgrClient
41 func (r *Registry) Initialize() {
42 r.register = make(map[uint32]*Subscription)
44 for i = 0; i < 65535; i++ {
45 r.subIds = append(r.subIds, i+1)
49 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
50 if len(r.subIds) > 0 {
51 sequenceNumber := r.subIds[0]
52 r.subIds = r.subIds[1:]
53 if _, ok := r.register[sequenceNumber]; ok == true {
54 r.subIds = append(r.subIds, sequenceNumber)
55 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
57 subs := &Subscription{
64 subs.ReqId.Seq = sequenceNumber
66 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
67 r.subIds = append(r.subIds, subs.ReqId.Seq)
68 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
73 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
76 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
78 for _, subs := range r.register {
79 if subs.IsMergeable(trans, subReqMsg) {
82 // check if there has been race conditions
85 //subs has been set to invalid
86 if subs.valid == false {
90 // If size is zero, entry is to be deleted
91 if subs.EpList.Size() == 0 {
95 // try to add to endpointlist.
96 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
102 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
109 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
113 defer r.mutex.Unlock()
116 // Check validity of subscription action types
118 actionType, err := r.CheckActionTypes(subReqMsg)
120 xapp.Logger.Debug("CREATE %s", err)
125 // Find possible existing Policy subscription
127 if actionType == e2ap.E2AP_ActionTypePolicy {
128 if subs, ok := r.register[subReqMsg.RequestId.Seq]; ok {
129 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
130 subs.SetCachedResponse(nil, true)
135 subs := r.findExistingSubs(trans, subReqMsg)
137 subs, err = r.allocateSubs(trans, subReqMsg)
145 // Add to subscription
148 defer subs.mutex.Unlock()
150 epamount := subs.EpList.Size()
154 // Subscription route updates
157 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
158 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
160 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
161 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
167 r.subIds = append(r.subIds, subs.ReqId.Seq)
173 r.register[subs.ReqId.Seq] = subs
175 xapp.Logger.Debug("CREATE %s", subs.String())
176 xapp.Logger.Debug("Registry: substable=%v", r.register)
180 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
181 var reportFound bool = false
182 var policyFound bool = false
184 for _, acts := range subReqMsg.ActionSetups {
185 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
188 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
192 if reportFound == true && policyFound == true {
193 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
195 if reportFound == true {
196 return e2ap.E2AP_ActionTypeReport, nil
198 if policyFound == true {
199 return e2ap.E2AP_ActionTypePolicy, nil
201 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
204 // TODO: Works with concurrent calls, but check if can be improved
205 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
208 defer r.mutex.Unlock()
210 defer subs.mutex.Unlock()
212 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
213 epamount := subs.EpList.Size()
214 seqId := subs.ReqId.Seq
216 if delStatus == false {
223 // Wait some time before really do route updates
225 if waitRouteClean > 0 {
227 time.Sleep(waitRouteClean)
231 xapp.Logger.Info("CLEAN %s", subs.String())
234 // Subscription route updates
237 tmpList := RmrEndpointList{}
238 tmpList.AddEndpoint(trans.GetEndpoint())
239 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
240 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
241 } else if subs.EpList.Size() > 0 {
242 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
243 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
248 // If last endpoint, release and free seqid
251 if _, ok := r.register[seqId]; ok {
252 xapp.Logger.Debug("RELEASE %s", subs.String())
253 delete(r.register, seqId)
254 xapp.Logger.Debug("Registry: substable=%v", r.register)
256 r.subIds = append(r.subIds, seqId)
262 func (r *Registry) GetSubscription(sn uint32) *Subscription {
264 defer r.mutex.Unlock()
265 if _, ok := r.register[sn]; ok {
266 return r.register[sn]
271 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
273 defer r.mutex.Unlock()
274 for _, id := range ids {
275 if _, ok := r.register[id]; ok {
276 return r.register[id], nil
279 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)