X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=848b916a270e6464efb9fd0a39218f3ad63d5aba;hb=3cdd2e0c0d042b816f6d35f68a93f97fbbe7efc1;hp=aaad62565598bca33f81ff264033762396dee813;hpb=f682ace08a827bd260e4905b5ee1bddacf01c6e0;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index aaad625..848b916 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -72,21 +72,22 @@ var waitRouteCleanup_ms time.Duration var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry var readSubsFromDb string -var restDuplicateCtrl duplicateCtrl var dbRetryForever string var dbTryCount int type Control struct { *xapp.RMRClient - e2ap *E2ap - registry *Registry - tracker *Tracker - e2SubsDb Sdlnterface - restSubsDb Sdlnterface - CntRecvMsg uint64 - ResetTestFlag bool - Counters map[string]xapp.Counter - LoggerLevel int + e2ap *E2ap + registry *Registry + tracker *Tracker + restDuplicateCtrl *DuplicateCtrl + e2SubsDb Sdlnterface + restSubsDb Sdlnterface + CntRecvMsg uint64 + ResetTestFlag bool + Counters map[string]xapp.Counter + LoggerLevel int + UTTesting bool } type RMRMeid struct { @@ -132,13 +133,17 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() + restDuplicateCtrl := new(DuplicateCtrl) + restDuplicateCtrl.Init() + c := &Control{e2ap: new(E2ap), - registry: registry, - tracker: tracker, - e2SubsDb: CreateSdl(), - restSubsDb: CreateRESTSdl(), - Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), - LoggerLevel: 3, + registry: registry, + tracker: tracker, + restDuplicateCtrl: restDuplicateCtrl, + e2SubsDb: CreateSdl(), + restSubsDb: CreateRESTSdl(), + Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), + LoggerLevel: 4, } c.ReadConfigParameters("") @@ -147,17 +152,16 @@ func NewControl() *Control { xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET") xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET") - go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) - if readSubsFromDb == "false" { return c } - restDuplicateCtrl.Init() - // Read subscriptions from db c.ReadE2Subscriptions() c.ReadRESTSubscriptions() + + go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) + return c } @@ -318,13 +322,13 @@ func (c *Control) Run() { //------------------------------------------------------------------- // //------------------------------------------------------------------- -func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, string, error) { +func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) { var restSubId string var restSubscription *RESTSubscription var err error - prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum) + prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum) if p.SubscriptionID == "" { // Subscription does not contain REST subscription Id if exists { @@ -339,7 +343,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s } } else { xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum) - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) } } @@ -357,7 +361,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s // Subscription with id in REST request does not exist xapp.Logger.Error("%s", err.Error()) c.UpdateCounter(cRestSubFailToXapp) - return nil, "", models.SubscriptionInstanceRejectCauseRESTSubscriptionWithGivenIDDoesNotExist, err + return nil, "", err } if !exists { @@ -367,7 +371,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s } } - return restSubscription, restSubId, "", nil + return restSubscription, restSubId, nil } //------------------------------------------------------------------- @@ -389,14 +393,14 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript err := fmt.Errorf("ClientEndpoint == nil") xapp.Logger.Error("%v", err) c.UpdateCounter(cRestSubFailToXapp) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint) if err != nil { xapp.Logger.Error("%s", err.Error()) c.UpdateCounter(cRestSubFailToXapp) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } md5sum, err := CalculateRequestMd5sum(params) @@ -404,10 +408,10 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error()) } - restSubscription, restSubId, rejectCause, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint) + restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint) if err != nil { - xapp.Logger.Error("Failed to get/allocate REST subscription") - return c.GetSubscriptionResponse(rejectCause, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + xapp.Logger.Error("Subscription with id in REST request does not exist") + return nil, common.SubscribeNotFoundCode } subResp.SubscriptionID = &restSubId @@ -415,13 +419,13 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription) if err != nil { xapp.Logger.Error("%s", err.Error()) - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) c.registry.DeleteRESTSubscription(&restSubId) c.UpdateCounter(cRestSubFailToXapp) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } - duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum) + duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum) if duplicate { err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum) xapp.Logger.Debug("%s", err) @@ -433,7 +437,7 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p) if err != nil { xapp.Logger.Error("%s", err) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives) @@ -468,12 +472,7 @@ func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount) } } - if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil { - xapp.Logger.Error("p.E2SubscriptionDirectives.RMRRoutingNeeded == nil") - e2SubscriptionDirectives.CreateRMRRoute = true - } else { - e2SubscriptionDirectives.CreateRMRRoute = *p.E2SubscriptionDirectives.RMRRoutingNeeded - } + e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded } xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue) xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount) @@ -481,24 +480,6 @@ func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2 return e2SubscriptionDirectives, nil } -//------------------------------------------------------------------- -// -//------------------------------------------------------------------- -func (c *Control) GetSubscriptionResponse(rejectCause string, errorCause string, errorSource string, timeoutType string) *models.SubscriptionResponse { - subResp := models.SubscriptionResponse{} - subscriptionInstance := models.SubscriptionInstance{} - subscriptionInstance.RejectCause = &rejectCause - subscriptionInstance.ErrorCause = &errorCause - subscriptionInstance.ErrorSource = &errorSource - if timeoutType != "" { - subscriptionInstance.TimeoutType = &timeoutType - } - subResp.SubscriptionInstances = append(subResp.SubscriptionInstances, &subscriptionInstance) - xapp.Logger.Error("etSubscriptionResponse() %+v", subscriptionInstance) - - return &subResp -} - //------------------------------------------------------------------- // //------------------------------------------------------------------- @@ -506,13 +487,14 @@ func (c *Control) GetSubscriptionResponse(rejectCause string, errorCause string, func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList, clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) { + c.SubscriptionProcessingStartDelay() xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests)) var xAppEventInstanceID int64 var e2EventInstanceID int64 errorInfo := &ErrorInfo{} - defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum) + defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum) for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ { subReqMsg := subReqList.E2APSubscriptionRequests[index] @@ -532,6 +514,7 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives) xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans)) + trans.Release() if err != nil { c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo) @@ -542,7 +525,19 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans) } - trans.Release() + } +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------ +func (c *Control) SubscriptionProcessingStartDelay() { + if c.UTTesting == true { + // This is temporary fix for the UT problem that notification arrives before subscription response + // Correct fix would be to allow notification come before response and process it correctly + xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions") + <-time.After(time.Millisecond * 50) + xapp.Logger.Debug("Continuing after delay") } } @@ -624,9 +619,9 @@ func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSu SubscriptionID: restSubId, SubscriptionInstances: []*models.SubscriptionInstance{ &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, - ErrorCause: &errorInfo.ErrorCause, - ErrorSource: &errorInfo.ErrorSource, - TimeoutType: &errorInfo.TimeoutType, + ErrorCause: errorInfo.ErrorCause, + ErrorSource: errorInfo.ErrorSource, + TimeoutType: errorInfo.TimeoutType, XappEventInstanceID: &xAppEventInstanceID}, }, } @@ -660,7 +655,7 @@ func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubs SubscriptionID: restSubId, SubscriptionInstances: []*models.SubscriptionInstance{ &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, - ErrorCause: nil, + ErrorCause: "", XappEventInstanceID: &xAppEventInstanceID}, }, } @@ -715,7 +710,7 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID) restSubscription.DeleteE2InstanceId(instanceId) } - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum) c.registry.DeleteRESTSubscription(&restSubId) c.RemoveRESTSubscriptionFromDb(restSubId) }() @@ -1506,11 +1501,7 @@ func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) { fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount) } fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue) - if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil { - fmt.Println(" RMRRoutingNeeded == nil") - } else { - fmt.Printf(" RMRRoutingNeeded = %v\n", *p.E2SubscriptionDirectives.RMRRoutingNeeded) - } + fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded) } for _, subscriptionDetail := range p.SubscriptionDetails { if p.RANFunctionID != nil {