2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type RESTSubscription struct {
37 xAppRmrEndPoint string
40 xAppIdToE2Id map[int64]int64
45 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
46 r.InstanceIds = append(r.InstanceIds, instanceId)
49 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
50 r.InstanceIds = r.InstanceIds[1:]
53 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
54 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
57 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
58 return r.xAppIdToE2Id[xAppEventInstanceID]
61 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
62 delete(r.xAppIdToE2Id, xAppEventInstanceID)
65 func (r *RESTSubscription) SetProcessed() {
66 r.SubReqOngoing = false
69 type Registry struct {
71 register map[uint32]*Subscription
73 rtmgrClient *RtmgrClient
74 restSubscriptions map[string]*RESTSubscription
77 func (r *Registry) Initialize() {
78 r.register = make(map[uint32]*Subscription)
79 r.restSubscriptions = make(map[string]*RESTSubscription)
82 for i = 1; i < 65535; i++ {
83 r.subIds = append(r.subIds, i)
87 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
89 defer r.mutex.Unlock()
90 newRestSubscription := RESTSubscription{}
91 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
92 newRestSubscription.Meid = *maid
93 newRestSubscription.SubReqOngoing = true
94 newRestSubscription.SubDelReqOngoing = false
95 r.restSubscriptions[*restSubId] = &newRestSubscription
96 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
97 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
98 return &newRestSubscription, nil
101 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
103 defer r.mutex.Unlock()
104 delete(r.restSubscriptions, *restSubId)
105 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
108 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
110 defer r.mutex.Unlock()
111 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
112 // Subscription deletion is not allowed if prosessing subscription request in not ready
113 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
114 if IsDelReqOngoing == true {
115 restSubscription.SubDelReqOngoing = true
117 r.restSubscriptions[restSubId] = restSubscription
118 return restSubscription, nil
120 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
122 return restSubscription, nil
124 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
127 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
129 defer r.mutex.Unlock()
131 resp := models.SubscriptionList{}
132 for _, subs := range r.register {
134 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
140 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
141 if len(r.subIds) > 0 {
143 r.subIds = r.subIds[1:]
144 if _, ok := r.register[subId]; ok == true {
145 r.subIds = append(r.subIds, subId)
146 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
148 subs := &Subscription{
151 SubReqMsg: subReqMsg,
153 RetryFromXapp: false,
157 DoNotWaitSubResp: false,
159 subs.ReqId.Id = subReqMsg.RequestId.Id
160 subs.ReqId.InstanceId = subId
161 if resetTestFlag == true {
162 subs.DoNotWaitSubResp = true
165 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
166 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
167 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
171 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
174 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
176 for _, subs := range r.register {
177 if subs.IsMergeable(trans, subReqMsg) {
180 // check if there has been race conditions
183 //subs has been set to invalid
184 if subs.valid == false {
188 // If size is zero, entry is to be deleted
189 if subs.EpList.Size() == 0 {
193 // Try to add to endpointlist. Adding fails if endpoint is already in the list
194 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
196 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
201 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
208 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
212 defer r.mutex.Unlock()
215 // Check validity of subscription action types
217 actionType, err := r.CheckActionTypes(subReqMsg)
219 xapp.Logger.Debug("CREATE %s", err)
224 // Find possible existing Policy subscription
226 if actionType == e2ap.E2AP_ActionTypePolicy {
227 if subs, ok := r.register[trans.GetSubId()]; ok {
228 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
229 // Update message data to subscription
230 subs.SubReqMsg = subReqMsg
231 subs.SetCachedResponse(nil, true)
236 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
238 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
242 } else if endPointFound == true {
243 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
244 subs.RetryFromXapp = true
245 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
246 c.UpdateCounter(cDuplicateE2SubReq)
251 // Add to subscription
254 defer subs.mutex.Unlock()
256 epamount := subs.EpList.Size()
257 xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
261 // Subscription route updates
264 err = r.RouteCreate(subs, c)
266 err = r.RouteCreateUpdate(subs, c)
272 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
274 // Delete already added endpoint for the request
275 subs.EpList.DelEndpoint(trans.GetEndpoint())
280 r.register[subs.ReqId.InstanceId] = subs
282 xapp.Logger.Debug("CREATE %s", subs.String())
283 xapp.Logger.Debug("Registry: substable=%v", r.register)
287 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
288 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
289 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
291 c.UpdateCounter(cRouteCreateFail)
296 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
297 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
298 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
300 c.UpdateCounter(cRouteCreateUpdateFail)
303 c.UpdateCounter(cMergedSubscriptions)
307 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
308 var reportFound bool = false
309 var policyFound bool = false
310 var insertFound bool = false
312 for _, acts := range subReqMsg.ActionSetups {
313 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
316 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
319 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
323 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
324 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
326 if reportFound == true {
327 return e2ap.E2AP_ActionTypeReport, nil
329 if policyFound == true {
330 return e2ap.E2AP_ActionTypePolicy, nil
332 if insertFound == true {
333 return e2ap.E2AP_ActionTypeInsert, nil
335 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
338 // TODO: Works with concurrent calls, but check if can be improved
339 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
342 defer r.mutex.Unlock()
344 defer subs.mutex.Unlock()
346 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
347 epamount := subs.EpList.Size()
348 subId := subs.ReqId.InstanceId
350 if delStatus == false {
355 if waitRouteClean > 0 {
356 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
357 time.Sleep(waitRouteClean)
361 defer subs.mutex.Unlock()
362 xapp.Logger.Info("CLEAN %s", subs.String())
366 // Subscription route delete
368 r.RouteDelete(subs, trans, c)
371 // Subscription release
374 defer r.mutex.Unlock()
376 if _, ok := r.register[subId]; ok {
377 xapp.Logger.Debug("RELEASE %s", subs.String())
378 delete(r.register, subId)
379 xapp.Logger.Debug("Registry: substable=%v", r.register)
381 r.subIds = append(r.subIds, subId)
382 } else if subs.EpList.Size() > 0 {
384 // Subscription route update
386 r.RouteDeleteUpdate(subs, c)
393 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
394 tmpList := xapp.RmrEndpointList{}
395 tmpList.AddEndpoint(trans.GetEndpoint())
396 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
397 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
398 c.UpdateCounter(cRouteDeleteFail)
402 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
403 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
404 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
405 c.UpdateCounter(cRouteDeleteUpdateFail)
409 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
411 defer r.mutex.Unlock()
413 defer subs.mutex.Unlock()
415 epamount := subs.EpList.Size()
417 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
418 // Not merged subscription is being deleted
419 c.RemoveSubscriptionFromDb(subs)
422 } else if subs.EpList.Size() > 0 {
423 // Endpoint of merged subscription is being deleted
424 c.WriteSubscriptionToDb(subs)
425 c.UpdateCounter(cUnmergedSubscriptions)
429 func (r *Registry) GetSubscription(subId uint32) *Subscription {
431 defer r.mutex.Unlock()
432 if _, ok := r.register[subId]; ok {
433 return r.register[subId]
438 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
440 defer r.mutex.Unlock()
441 for _, subId := range subIds {
442 if _, ok := r.register[subId]; ok {
443 return r.register[subId], nil
446 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)