2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
29 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 //-----------------------------------------------------------------------------
36 //-----------------------------------------------------------------------------
38 type RESTSubscription struct {
39 xAppRmrEndPoint string
42 xAppIdToE2Id map[int64]int64
48 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
50 for _, v := range r.InstanceIds {
57 r.InstanceIds = append(r.InstanceIds, instanceId)
60 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
62 r.lastReqMd5sum = md5sum
64 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
68 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
69 r.InstanceIds = r.InstanceIds[1:]
72 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
73 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
76 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
77 return r.xAppIdToE2Id[xAppEventInstanceID]
80 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
81 delete(r.xAppIdToE2Id, xAppEventInstanceID)
84 func (r *RESTSubscription) SetProcessed(err error) {
85 r.SubReqOngoing = false
91 type Registry struct {
93 register map[uint32]*Subscription
95 rtmgrClient *RtmgrClient
96 restSubscriptions map[string]*RESTSubscription
99 func (r *Registry) Initialize() {
100 r.register = make(map[uint32]*Subscription)
101 r.restSubscriptions = make(map[string]*RESTSubscription)
104 for i = 1; i < 65535; i++ {
105 r.subIds = append(r.subIds, i)
109 func (r *Registry) GetAllRestSubscriptions() []byte {
111 defer r.mutex.Unlock()
112 restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
114 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
116 return restSubscriptionsJson
119 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) *RESTSubscription {
121 defer r.mutex.Unlock()
122 newRestSubscription := RESTSubscription{}
123 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
124 newRestSubscription.Meid = *maid
125 newRestSubscription.SubReqOngoing = true
126 newRestSubscription.SubDelReqOngoing = false
127 r.restSubscriptions[*restSubId] = &newRestSubscription
128 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
129 xapp.Logger.Debug("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
130 return &newRestSubscription
133 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
135 defer r.mutex.Unlock()
136 delete(r.restSubscriptions, *restSubId)
137 xapp.Logger.Debug("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
140 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
142 defer r.mutex.Unlock()
143 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
144 // Subscription deletion is not allowed if prosessing subscription request in not ready
145 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
146 if IsDelReqOngoing == true {
147 restSubscription.SubDelReqOngoing = true
149 r.restSubscriptions[restSubId] = restSubscription
150 return restSubscription, nil
152 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
155 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
158 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
160 defer r.mutex.Unlock()
162 resp := models.SubscriptionList{}
163 for _, subs := range r.register {
165 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
171 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, rmrRoutecreated bool) (*Subscription, error) {
172 if len(r.subIds) > 0 {
174 r.subIds = r.subIds[1:]
175 if _, ok := r.register[subId]; ok == true {
176 r.subIds = append(r.subIds, subId)
177 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
179 subs := &Subscription{
182 RMRRouteCreated: rmrRoutecreated,
183 SubReqMsg: subReqMsg,
186 RetryFromXapp: false,
190 DoNotWaitSubResp: false,
192 subs.ReqId.Id = subReqMsg.RequestId.Id
193 subs.ReqId.InstanceId = subId
194 r.SetResetTestFlag(resetTestFlag, subs)
196 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
197 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
198 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
202 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
205 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
207 for _, subs := range r.register {
208 if subs.IsMergeable(trans, subReqMsg) {
211 // check if there has been race conditions
214 //subs has been set to invalid
215 if subs.valid == false {
219 // If size is zero, entry is to be deleted
220 if subs.EpList.Size() == 0 {
224 // Try to add to endpointlist. Adding fails if endpoint is already in the list
225 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
227 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
232 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
239 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control, createRMRRoute bool) (*Subscription, ErrorInfo, error) {
242 errorInfo := ErrorInfo{}
244 defer r.mutex.Unlock()
247 // Check validity of subscription action types
249 actionType, err := r.CheckActionTypes(subReqMsg)
251 xapp.Logger.Debug("CREATE %s", err)
252 err = fmt.Errorf("E2 content validation failed")
253 return nil, errorInfo, err
257 // Find possible existing Policy subscription
259 if actionType == e2ap.E2AP_ActionTypePolicy {
260 if subs, ok := r.register[trans.GetSubId()]; ok {
261 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
262 // Update message data to subscription
263 subs.SubReqMsg = subReqMsg
264 subs.PolicyUpdate = true
265 subs.SetCachedResponse(nil, true)
266 r.SetResetTestFlag(resetTestFlag, subs)
267 return subs, errorInfo, nil
271 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
273 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag, createRMRRoute); err != nil {
274 xapp.Logger.Error("%s", err.Error())
275 err = fmt.Errorf("subscription not allocated")
276 return nil, errorInfo, err
279 } else if endPointFound == true {
280 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
281 subs.RetryFromXapp = true
282 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
283 c.UpdateCounter(cDuplicateE2SubReq)
284 return subs, errorInfo, nil
288 // Add to subscription
291 defer subs.mutex.Unlock()
293 epamount := subs.EpList.Size()
294 xapp.Logger.Debug("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
298 // Subscription route updates
300 if createRMRRoute == true {
302 errorInfo, err = r.RouteCreate(subs, c)
304 errorInfo, err = r.RouteCreateUpdate(subs, c)
307 xapp.Logger.Debug("RMR route not created: createRMRRoute=%v", createRMRRoute)
313 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
315 // Delete already added endpoint for the request
316 subs.EpList.DelEndpoint(trans.GetEndpoint())
317 return nil, errorInfo, err
321 r.register[subs.ReqId.InstanceId] = subs
323 xapp.Logger.Debug("CREATE %s", subs.String())
324 xapp.Logger.Debug("Registry: substable=%v", r.register)
325 return subs, errorInfo, nil
328 func (r *Registry) RouteCreate(subs *Subscription, c *Control) (ErrorInfo, error) {
329 errorInfo := ErrorInfo{}
330 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
331 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
333 if strings.Contains(err.Error(), "status 400") {
334 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
336 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
338 errorInfo.ErrorCause = err.Error()
339 c.UpdateCounter(cRouteCreateFail)
340 xapp.Logger.Error("%s", err.Error())
341 err = fmt.Errorf("RTMGR route create failure")
343 return errorInfo, err
346 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) (ErrorInfo, error) {
347 errorInfo := ErrorInfo{}
348 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
349 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
351 if strings.Contains(err.Error(), "status 400") {
352 errorInfo.TimeoutType = models.SubscriptionInstanceTimeoutTypeRTMGRTimeout
354 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceRTMGR
356 errorInfo.ErrorCause = err.Error()
357 c.UpdateCounter(cRouteCreateUpdateFail)
358 xapp.Logger.Error("%s", err.Error())
359 err = fmt.Errorf("RTMGR route update failure")
360 return errorInfo, err
362 c.UpdateCounter(cMergedSubscriptions)
363 return errorInfo, err
366 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
367 var reportFound bool = false
368 var policyFound bool = false
369 var insertFound bool = false
371 for _, acts := range subReqMsg.ActionSetups {
372 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
375 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
378 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
382 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
383 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
385 if reportFound == true {
386 return e2ap.E2AP_ActionTypeReport, nil
388 if policyFound == true {
389 return e2ap.E2AP_ActionTypePolicy, nil
391 if insertFound == true {
392 return e2ap.E2AP_ActionTypeInsert, nil
394 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
397 // TODO: Works with concurrent calls, but check if can be improved
398 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
401 defer r.mutex.Unlock()
403 defer subs.mutex.Unlock()
405 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
406 epamount := subs.EpList.Size()
407 subId := subs.ReqId.InstanceId
409 if delStatus == false {
414 if waitRouteClean > 0 {
415 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
416 time.Sleep(waitRouteClean)
420 defer subs.mutex.Unlock()
421 xapp.Logger.Debug("CLEAN %s", subs.String())
425 // Subscription route delete
427 if subs.RMRRouteCreated == true {
428 r.RouteDelete(subs, trans, c)
432 // Subscription release
435 defer r.mutex.Unlock()
437 if _, ok := r.register[subId]; ok {
438 xapp.Logger.Debug("RELEASE %s", subs.String())
439 delete(r.register, subId)
440 xapp.Logger.Debug("Registry: substable=%v", r.register)
442 r.subIds = append(r.subIds, subId)
443 } else if subs.EpList.Size() > 0 {
445 // Subscription route update
447 if subs.RMRRouteCreated == true {
448 r.RouteDeleteUpdate(subs, c)
456 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
457 tmpList := xapp.RmrEndpointList{}
458 tmpList.AddEndpoint(trans.GetEndpoint())
459 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
460 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
461 c.UpdateCounter(cRouteDeleteFail)
465 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
466 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
467 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
468 c.UpdateCounter(cRouteDeleteUpdateFail)
472 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
474 defer r.mutex.Unlock()
476 defer subs.mutex.Unlock()
478 epamount := subs.EpList.Size()
480 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
481 // Not merged subscription is being deleted
482 c.RemoveSubscriptionFromDb(subs)
485 } else if subs.EpList.Size() > 0 {
486 // Endpoint of merged subscription is being deleted
487 c.WriteSubscriptionToDb(subs)
488 c.UpdateCounter(cUnmergedSubscriptions)
492 func (r *Registry) GetSubscription(subId uint32) *Subscription {
494 defer r.mutex.Unlock()
495 if _, ok := r.register[subId]; ok {
496 return r.register[subId]
501 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
503 defer r.mutex.Unlock()
504 for _, subId := range subIds {
505 if _, ok := r.register[subId]; ok {
506 return r.register[subId], nil
509 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)
512 func (r *Registry) SetResetTestFlag(resetTestFlag bool, subs *Subscription) {
513 if resetTestFlag == true {
514 // This is used in submgr restart unit tests
515 xapp.Logger.Debug("resetTestFlag == true")
516 subs.DoNotWaitSubResp = true
518 xapp.Logger.Debug("resetTestFlag == false")