2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type RESTSubscription struct {
37 xAppRmrEndPoint string
40 xAppIdToE2Id map[int64]int64
46 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
48 for _, v := range r.InstanceIds {
55 r.InstanceIds = append(r.InstanceIds, instanceId)
58 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
60 r.lastReqMd5sum = md5sum
62 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
66 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
67 r.InstanceIds = r.InstanceIds[1:]
70 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
71 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
74 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
75 return r.xAppIdToE2Id[xAppEventInstanceID]
78 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
79 delete(r.xAppIdToE2Id, xAppEventInstanceID)
82 func (r *RESTSubscription) SetProcessed(err error) {
83 r.SubReqOngoing = false
89 type Registry struct {
91 register map[uint32]*Subscription
93 rtmgrClient *RtmgrClient
94 restSubscriptions map[string]*RESTSubscription
97 func (r *Registry) Initialize() {
98 r.register = make(map[uint32]*Subscription)
99 r.restSubscriptions = make(map[string]*RESTSubscription)
102 for i = 1; i < 65535; i++ {
103 r.subIds = append(r.subIds, i)
107 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
109 defer r.mutex.Unlock()
110 newRestSubscription := RESTSubscription{}
111 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
112 newRestSubscription.Meid = *maid
113 newRestSubscription.SubReqOngoing = true
114 newRestSubscription.SubDelReqOngoing = false
115 r.restSubscriptions[*restSubId] = &newRestSubscription
116 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
117 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
118 return &newRestSubscription, nil
121 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
123 defer r.mutex.Unlock()
124 delete(r.restSubscriptions, *restSubId)
125 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
128 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
130 defer r.mutex.Unlock()
131 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
132 // Subscription deletion is not allowed if prosessing subscription request in not ready
133 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
134 if IsDelReqOngoing == true {
135 restSubscription.SubDelReqOngoing = true
137 r.restSubscriptions[restSubId] = restSubscription
138 return restSubscription, nil
140 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
142 return restSubscription, nil
144 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
147 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
149 defer r.mutex.Unlock()
151 resp := models.SubscriptionList{}
152 for _, subs := range r.register {
154 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
160 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
161 if len(r.subIds) > 0 {
163 r.subIds = r.subIds[1:]
164 if _, ok := r.register[subId]; ok == true {
165 r.subIds = append(r.subIds, subId)
166 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
168 subs := &Subscription{
171 SubReqMsg: subReqMsg,
173 RetryFromXapp: false,
177 DoNotWaitSubResp: false,
179 subs.ReqId.Id = subReqMsg.RequestId.Id
180 subs.ReqId.InstanceId = subId
181 if resetTestFlag == true {
182 subs.DoNotWaitSubResp = true
185 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
186 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
187 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
191 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
194 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
196 for _, subs := range r.register {
197 if subs.IsMergeable(trans, subReqMsg) {
200 // check if there has been race conditions
203 //subs has been set to invalid
204 if subs.valid == false {
208 // If size is zero, entry is to be deleted
209 if subs.EpList.Size() == 0 {
213 // Try to add to endpointlist. Adding fails if endpoint is already in the list
214 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
216 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
221 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
228 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
232 defer r.mutex.Unlock()
235 // Check validity of subscription action types
237 actionType, err := r.CheckActionTypes(subReqMsg)
239 xapp.Logger.Info("CREATE %s", err)
240 err = fmt.Errorf("E2 content validation failed")
245 // Find possible existing Policy subscription
247 if actionType == e2ap.E2AP_ActionTypePolicy {
248 if subs, ok := r.register[trans.GetSubId()]; ok {
249 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
250 // Update message data to subscription
251 subs.SubReqMsg = subReqMsg
252 subs.SetCachedResponse(nil, true)
257 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
259 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
260 xapp.Logger.Error("%s", err.Error())
261 err = fmt.Errorf("subscription not allocated")
265 } else if endPointFound == true {
266 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
267 subs.RetryFromXapp = true
268 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
269 c.UpdateCounter(cDuplicateE2SubReq)
274 // Add to subscription
277 defer subs.mutex.Unlock()
279 epamount := subs.EpList.Size()
280 xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
284 // Subscription route updates
287 err = r.RouteCreate(subs, c)
289 err = r.RouteCreateUpdate(subs, c)
295 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
297 // Delete already added endpoint for the request
298 subs.EpList.DelEndpoint(trans.GetEndpoint())
303 r.register[subs.ReqId.InstanceId] = subs
305 xapp.Logger.Debug("CREATE %s", subs.String())
306 xapp.Logger.Debug("Registry: substable=%v", r.register)
310 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
311 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
312 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
314 c.UpdateCounter(cRouteCreateFail)
315 xapp.Logger.Error("%s", err.Error())
316 err = fmt.Errorf("RTMGR route create failure")
321 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
322 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
323 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
325 c.UpdateCounter(cRouteCreateUpdateFail)
326 xapp.Logger.Error("%s", err.Error())
327 err = fmt.Errorf("RTMGR route update failure")
330 c.UpdateCounter(cMergedSubscriptions)
334 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
335 var reportFound bool = false
336 var policyFound bool = false
337 var insertFound bool = false
339 for _, acts := range subReqMsg.ActionSetups {
340 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
343 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
346 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
350 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
351 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
353 if reportFound == true {
354 return e2ap.E2AP_ActionTypeReport, nil
356 if policyFound == true {
357 return e2ap.E2AP_ActionTypePolicy, nil
359 if insertFound == true {
360 return e2ap.E2AP_ActionTypeInsert, nil
362 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
365 // TODO: Works with concurrent calls, but check if can be improved
366 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
369 defer r.mutex.Unlock()
371 defer subs.mutex.Unlock()
373 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
374 epamount := subs.EpList.Size()
375 subId := subs.ReqId.InstanceId
377 if delStatus == false {
382 if waitRouteClean > 0 {
383 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
384 time.Sleep(waitRouteClean)
388 defer subs.mutex.Unlock()
389 xapp.Logger.Info("CLEAN %s", subs.String())
393 // Subscription route delete
395 r.RouteDelete(subs, trans, c)
398 // Subscription release
401 defer r.mutex.Unlock()
403 if _, ok := r.register[subId]; ok {
404 xapp.Logger.Debug("RELEASE %s", subs.String())
405 delete(r.register, subId)
406 xapp.Logger.Debug("Registry: substable=%v", r.register)
408 r.subIds = append(r.subIds, subId)
409 } else if subs.EpList.Size() > 0 {
411 // Subscription route update
413 r.RouteDeleteUpdate(subs, c)
420 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
421 tmpList := xapp.RmrEndpointList{}
422 tmpList.AddEndpoint(trans.GetEndpoint())
423 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
424 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
425 c.UpdateCounter(cRouteDeleteFail)
429 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
430 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
431 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
432 c.UpdateCounter(cRouteDeleteUpdateFail)
436 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
438 defer r.mutex.Unlock()
440 defer subs.mutex.Unlock()
442 epamount := subs.EpList.Size()
444 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
445 // Not merged subscription is being deleted
446 c.RemoveSubscriptionFromDb(subs)
449 } else if subs.EpList.Size() > 0 {
450 // Endpoint of merged subscription is being deleted
451 c.WriteSubscriptionToDb(subs)
452 c.UpdateCounter(cUnmergedSubscriptions)
456 func (r *Registry) GetSubscription(subId uint32) *Subscription {
458 defer r.mutex.Unlock()
459 if _, ok := r.register[subId]; ok {
460 return r.register[subId]
465 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
467 defer r.mutex.Unlock()
468 for _, subId := range subIds {
469 if _, ok := r.register[subId]; ok {
470 return r.register[subId], nil
473 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)