2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 //-----------------------------------------------------------------------------
33 //-----------------------------------------------------------------------------
35 type Registry struct {
37 register map[uint32]*Subscription
39 rtmgrClient *RtmgrClient
42 func (r *Registry) Initialize() {
43 r.register = make(map[uint32]*Subscription)
45 for i = 0; i < 65535; i++ {
46 r.subIds = append(r.subIds, i+1)
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52 defer r.mutex.Unlock()
54 resp := models.SubscriptionList{}
55 for _, subs := range r.register {
57 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64 if len(r.subIds) > 0 {
66 r.subIds = r.subIds[1:]
67 if _, ok := r.register[subId]; ok == true {
68 r.subIds = append(r.subIds, subId)
69 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71 subs := &Subscription{
78 subs.ReqId.InstanceId = subId
80 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
82 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
87 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
92 for _, subs := range r.register {
93 if subs.IsMergeable(trans, subReqMsg) {
96 // check if there has been race conditions
99 //subs has been set to invalid
100 if subs.valid == false {
104 // If size is zero, entry is to be deleted
105 if subs.EpList.Size() == 0 {
109 // try to add to endpointlist.
110 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
116 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
123 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
127 defer r.mutex.Unlock()
130 // Check validity of subscription action types
132 actionType, err := r.CheckActionTypes(subReqMsg)
134 xapp.Logger.Debug("CREATE %s", err)
139 // Find possible existing Policy subscription
141 if actionType == e2ap.E2AP_ActionTypePolicy {
142 if subs, ok := r.register[trans.GetSubId()]; ok {
143 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
144 // Update message data to subscription
145 subs.SubReqMsg = subReqMsg
146 subs.SetCachedResponse(nil, true)
151 subs := r.findExistingSubs(trans, subReqMsg)
153 subs, err = r.allocateSubs(trans, subReqMsg)
161 // Add to subscription
164 defer subs.mutex.Unlock()
166 epamount := subs.EpList.Size()
170 // Subscription route updates
173 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
174 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
176 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
177 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
183 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
189 r.register[subs.ReqId.InstanceId] = subs
191 xapp.Logger.Debug("CREATE %s", subs.String())
192 xapp.Logger.Debug("Registry: substable=%v", r.register)
196 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
197 var reportFound bool = false
198 var policyFound bool = false
199 var insertFound bool = false
201 for _, acts := range subReqMsg.ActionSetups {
202 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
205 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
208 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
212 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
213 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
215 if reportFound == true {
216 return e2ap.E2AP_ActionTypeReport, nil
218 if policyFound == true {
219 return e2ap.E2AP_ActionTypePolicy, nil
221 if insertFound == true {
222 return e2ap.E2AP_ActionTypeInsert, nil
224 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
227 // TODO: Works with concurrent calls, but check if can be improved
228 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
231 defer r.mutex.Unlock()
233 defer subs.mutex.Unlock()
235 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
236 epamount := subs.EpList.Size()
237 subId := subs.ReqId.InstanceId
239 if delStatus == false {
244 if waitRouteClean > 0 {
245 time.Sleep(waitRouteClean)
249 defer subs.mutex.Unlock()
250 xapp.Logger.Info("CLEAN %s", subs.String())
254 // Subscription route delete
256 tmpList := xapp.RmrEndpointList{}
257 tmpList.AddEndpoint(trans.GetEndpoint())
258 subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
259 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
262 // Subscription release
265 defer r.mutex.Unlock()
267 if _, ok := r.register[subId]; ok {
268 xapp.Logger.Debug("RELEASE %s", subs.String())
269 delete(r.register, subId)
270 xapp.Logger.Debug("Registry: substable=%v", r.register)
272 r.subIds = append(r.subIds, subId)
274 } else if subs.EpList.Size() > 0 {
276 // Subscription route updates
278 subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
279 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
287 func (r *Registry) GetSubscription(subId uint32) *Subscription {
289 defer r.mutex.Unlock()
290 if _, ok := r.register[subId]; ok {
291 return r.register[subId]
296 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
298 defer r.mutex.Unlock()
299 for _, subId := range subIds {
300 if _, ok := r.register[subId]; ok {
301 return r.register[subId], nil
304 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)