X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=9234a34337d48731da5b4a8cd548852ac3a83b23;hb=b642a19e7527f03458f1b6ca47bca132019aa2cf;hp=70570cd022ea3eb0b6d5c13ba695fd956eb48bad;hpb=55d2a285e4914afce7492c6b4b6feebe5485210b;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 70570cd..9234a34 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -67,6 +67,7 @@ var waitRouteCleanup_ms time.Duration var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry var readSubsFromDb string +var restDuplicateCtrl duplicateCtrl type Control struct { *xapp.RMRClient @@ -127,6 +128,8 @@ func NewControl() *Control { return c } + restDuplicateCtrl.Init() + // Read subscriptions from db xapp.Logger.Info("Reading subscriptions from db") subIds, register, err := c.ReadAllSubscriptionsFromSdl() @@ -283,90 +286,117 @@ func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionR return nil, err } - go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId) + err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params) + + if err != nil { + // We were unable to detect whether this request was duplicate or not, proceed + xapp.Logger.Info("%s - proceeding with the request", err.Error()) + } else { + if duplicate { + if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" { + xapp.Logger.Info("Retransmission blocker dropped for report typer of request") + c.UpdateCounter(cRestSubRespToXapp) + return &subResp, nil + } + } + restSubscription.Md5sumOngoing = md5sum + } + + go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint) c.UpdateCounter(cRestSubRespToXapp) return &subResp, nil } +func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error, + clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) { + + // Send notification to xApp that prosessing of a Subscription Request has failed. + e2EventInstanceID := (int64)(0) + errorCause := err.Error() + resp := &models.SubscriptionResponse{ + SubscriptionID: restSubId, + SubscriptionInstances: []*models.SubscriptionInstance{ + &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, + ErrorCause: &errorCause, + XappEventInstanceID: &xAppEventInstanceID}, + }, + } + // Mark REST subscription request processed. + restSubscription.SetProcessed() + if trans != nil { + xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", + errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) + } else { + xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v", + errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID) + } + xapp.Subscription.Notify(resp, *clientEndpoint) + c.UpdateCounter(cRestSubFailNotifToXapp) +} + +func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64, + clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) { + + // Store successfully processed InstanceId for deletion + restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID)) + restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID) + + // Send notification to xApp that a Subscription Request has been processed. + resp := &models.SubscriptionResponse{ + SubscriptionID: restSubId, + SubscriptionInstances: []*models.SubscriptionInstance{ + &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, + ErrorCause: nil, + XappEventInstanceID: &xAppEventInstanceID}, + }, + } + // Mark REST subscription request processesd. + restSubscription.SetProcessed() + xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", + clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) + xapp.Subscription.Notify(resp, *clientEndpoint) + c.UpdateCounter(cRestSubNotifToXapp) +} + //------------------------------------------------------------------- // //------------------------------------------------------------------- func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList, - clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) { + clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) { xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests)) - _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint) - if err != nil { - c.registry.DeleteRESTSubscription(restSubId) - xapp.Logger.Error("XAPP-SubReq transaction not created, endpoint createtion failed for RESTSubId=%s, Meid=%s", *restSubId, *meid) - return - } - var xAppEventInstanceID int64 var e2EventInstanceID int64 - var errorCause string + + defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sumOngoing) + for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ { subReqMsg := subReqList.E2APSubscriptionRequests[index] + xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id) trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid}) if trans == nil { - c.registry.DeleteRESTSubscription(restSubId) - xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid) - return + // Send notification to xApp that prosessing of a Subscription Request has failed. + err := fmt.Errorf("Tracking failure") + c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans) + continue } - defer trans.Release() - xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id) xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans)) + subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId) if err != nil { - // Send notification to xApp that prosessing of a Subscription Request has failed. - e2EventInstanceID = (int64)(0) - errorCause = err.Error() - resp := &models.SubscriptionResponse{ - SubscriptionID: restSubId, - SubscriptionInstances: []*models.SubscriptionInstance{ - &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, - ErrorCause: &errorCause, - XappEventInstanceID: &xAppEventInstanceID}, - }, - } - // Mark REST subscription request processed. - restSubscription.SetProcessed() - xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", - errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) - xapp.Subscription.Notify(resp, *clientEndpoint) - c.UpdateCounter(cRestSubFailNotifToXapp) + c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans) } else { e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId) - xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) - - // Store successfully processed InstanceId for deletion - restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId) - restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID) - - // Send notification to xApp that a Subscription Request has been processed. - resp := &models.SubscriptionResponse{ - SubscriptionID: restSubId, - SubscriptionInstances: []*models.SubscriptionInstance{ - &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, - ErrorCause: nil, - XappEventInstanceID: &xAppEventInstanceID}, - }, - } - // Mark REST subscription request processesd. - restSubscription.SetProcessed() - xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", - clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) - xapp.Subscription.Notify(resp, *clientEndpoint) - c.UpdateCounter(cRestSubNotifToXapp) - + c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans) } + trans.Release() } }