2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type Registry struct {
38 register map[uint32]*Subscription
40 rtmgrClient *RtmgrClient
43 func (r *Registry) Initialize() {
44 r.register = make(map[uint32]*Subscription)
46 for i = 0; i < 65535; i++ {
47 r.subIds = append(r.subIds, i+1)
51 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
53 defer r.mutex.Unlock()
55 resp := models.SubscriptionList{}
56 for _, subs := range r.register {
58 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
64 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
65 if len(r.subIds) > 0 {
67 r.subIds = r.subIds[1:]
68 if _, ok := r.register[subId]; ok == true {
69 r.subIds = append(r.subIds, subId)
70 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
72 subs := &Subscription{
79 subs.ReqId.InstanceId = subId
81 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
82 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
83 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
88 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
91 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
93 for _, subs := range r.register {
94 if subs.IsMergeable(trans, subReqMsg) {
97 // check if there has been race conditions
100 //subs has been set to invalid
101 if subs.valid == false {
105 // If size is zero, entry is to be deleted
106 if subs.EpList.Size() == 0 {
110 // try to add to endpointlist.
111 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
117 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
124 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
128 defer r.mutex.Unlock()
131 // Check validity of subscription action types
133 actionType, err := r.CheckActionTypes(subReqMsg)
135 xapp.Logger.Debug("CREATE %s", err)
140 // Find possible existing Policy subscription
142 if actionType == e2ap.E2AP_ActionTypePolicy {
143 if subs, ok := r.register[trans.GetSubId()]; ok {
144 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
145 // Update message data to subscription
146 subs.SubReqMsg = subReqMsg
147 subs.SetCachedResponse(nil, true)
152 subs := r.findExistingSubs(trans, subReqMsg)
154 subs, err = r.allocateSubs(trans, subReqMsg)
162 // Add to subscription
165 defer subs.mutex.Unlock()
167 epamount := subs.EpList.Size()
171 // Subscription route updates
174 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
175 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
177 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
178 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
184 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
190 r.register[subs.ReqId.InstanceId] = subs
192 xapp.Logger.Debug("CREATE %s", subs.String())
193 xapp.Logger.Debug("Registry: substable=%v", r.register)
197 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
198 var reportFound bool = false
199 var policyFound bool = false
201 for _, acts := range subReqMsg.ActionSetups {
202 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
205 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
209 if reportFound == true && policyFound == true {
210 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
212 if reportFound == true {
213 return e2ap.E2AP_ActionTypeReport, nil
215 if policyFound == true {
216 return e2ap.E2AP_ActionTypePolicy, nil
218 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
221 // TODO: Works with concurrent calls, but check if can be improved
222 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
225 defer r.mutex.Unlock()
227 defer subs.mutex.Unlock()
229 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
230 epamount := subs.EpList.Size()
231 subId := subs.ReqId.InstanceId
233 if delStatus == false {
238 if waitRouteClean > 0 {
239 time.Sleep(waitRouteClean)
243 defer subs.mutex.Unlock()
244 xapp.Logger.Info("CLEAN %s", subs.String())
248 // Subscription route delete
250 tmpList := xapptweaks.RmrEndpointList{}
251 tmpList.AddEndpoint(trans.GetEndpoint())
252 subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
253 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
256 // Subscription release
259 defer r.mutex.Unlock()
261 if _, ok := r.register[subId]; ok {
262 xapp.Logger.Debug("RELEASE %s", subs.String())
263 delete(r.register, subId)
264 xapp.Logger.Debug("Registry: substable=%v", r.register)
266 r.subIds = append(r.subIds, subId)
268 } else if subs.EpList.Size() > 0 {
270 // Subscription route updates
272 subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
273 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
281 func (r *Registry) GetSubscription(subId uint32) *Subscription {
283 defer r.mutex.Unlock()
284 if _, ok := r.register[subId]; ok {
285 return r.register[subId]
290 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
292 defer r.mutex.Unlock()
293 for _, subId := range subIds {
294 if _, ok := r.register[subId]; ok {
295 return r.register[subId], nil
298 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)