var e2tMaxSubReqTryCount uint64 // Initial try + retry
var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
var readSubsFromDb string
+var restDuplicateCtrl duplicateCtrl
type Control struct {
*xapp.RMRClient
return c
}
+ restDuplicateCtrl.Init()
+
// Read subscriptions from db
xapp.Logger.Info("Reading subscriptions from db")
subIds, register, err := c.ReadAllSubscriptionsFromSdl()
return nil, err
}
- go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
+ err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
+
+ if err != nil {
+ // We were unable to detect whether this request was duplicate or not, proceed
+ xapp.Logger.Info("%s - proceeding with the request", err.Error())
+ } else {
+ if duplicate {
+ if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
+ xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
+ c.UpdateCounter(cRestSubRespToXapp)
+ return &subResp, nil
+ }
+ }
+ restSubscription.Md5sumOngoing = md5sum
+ }
+
+ go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
c.UpdateCounter(cRestSubRespToXapp)
return &subResp, nil
}
+func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
+ clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
+
+ // Send notification to xApp that prosessing of a Subscription Request has failed.
+ e2EventInstanceID := (int64)(0)
+ errorCause := err.Error()
+ resp := &models.SubscriptionResponse{
+ SubscriptionID: restSubId,
+ SubscriptionInstances: []*models.SubscriptionInstance{
+ &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
+ ErrorCause: &errorCause,
+ XappEventInstanceID: &xAppEventInstanceID},
+ },
+ }
+ // Mark REST subscription request processed.
+ restSubscription.SetProcessed()
+ if trans != nil {
+ xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+ errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
+ } else {
+ xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
+ errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
+ }
+
+ c.UpdateCounter(cRestSubFailNotifToXapp)
+ xapp.Subscription.Notify(resp, *clientEndpoint)
+}
+
+func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
+ clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
+
+ // Store successfully processed InstanceId for deletion
+ restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
+ restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
+
+ // Send notification to xApp that a Subscription Request has been processed.
+ resp := &models.SubscriptionResponse{
+ SubscriptionID: restSubId,
+ SubscriptionInstances: []*models.SubscriptionInstance{
+ &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
+ ErrorCause: nil,
+ XappEventInstanceID: &xAppEventInstanceID},
+ },
+ }
+ // Mark REST subscription request processesd.
+ restSubscription.SetProcessed()
+ xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+ clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
+
+ c.UpdateCounter(cRestSubNotifToXapp)
+ xapp.Subscription.Notify(resp, *clientEndpoint)
+}
+
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
- clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
+ clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
- _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
- if err != nil {
- xapp.Logger.Error("%s", err.Error())
- return
- }
-
var xAppEventInstanceID int64
var e2EventInstanceID int64
+
+ defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sumOngoing)
+
for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
subReqMsg := subReqList.E2APSubscriptionRequests[index]
+ xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
if trans == nil {
- c.registry.DeleteRESTSubscription(restSubId)
- xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
- return
+ // Send notification to xApp that prosessing of a Subscription Request has failed.
+ err := fmt.Errorf("Tracking failure")
+ c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
+ continue
}
- defer trans.Release()
- xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+
subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
if err != nil {
- // Send notification to xApp that prosessing of a Subscription Request has failed.
- e2EventInstanceID = (int64)(0)
- resp := &models.SubscriptionResponse{
- SubscriptionID: restSubId,
- SubscriptionInstances: []*models.SubscriptionInstance{
- &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
- ErrorCause: nil, //TODO: Suitable Error cause.
- XappEventInstanceID: &xAppEventInstanceID},
- },
- }
- // Mark REST subscription request processed.
- restSubscription.SetProcessed()
- xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
- clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
- xapp.Subscription.Notify(resp, *clientEndpoint)
- c.UpdateCounter(cRestSubFailNotifToXapp)
+ c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
} else {
e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
-
xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
-
- // Store successfully processed InstanceId for deletion
- restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId)
- restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
-
- // Send notification to xApp that a Subscription Request has been processed.
- resp := &models.SubscriptionResponse{
- SubscriptionID: restSubId,
- SubscriptionInstances: []*models.SubscriptionInstance{
- &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
- ErrorCause: nil,
- XappEventInstanceID: &xAppEventInstanceID},
- },
- }
- // Mark REST subscription request processesd.
- restSubscription.SetProcessed()
- xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
- clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
- xapp.Subscription.Notify(resp, *clientEndpoint)
- c.UpdateCounter(cRestSubNotifToXapp)
-
+ c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
}
+ trans.Release()
}
}
err := c.tracker.Track(trans)
if err != nil {
- err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
- xapp.Logger.Error("%s", err.Error())
+ xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
+ err = fmt.Errorf("Tracking failure")
return nil, err
}
subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
if err != nil {
- err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
- xapp.Logger.Error("%s", err.Error())
+ xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
return nil, err
}
trans.Release()
return themsg, nil
case *e2ap.E2APSubscriptionFailure:
- err = fmt.Errorf("SubscriptionFailure received")
+ err = fmt.Errorf("E2 SubscriptionFailure received")
return nil, err
default:
+ err = fmt.Errorf("unexpected E2 subscription response received")
break
}
+ } else {
+ err = fmt.Errorf("E2 subscription response timeout")
}
- err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
- xapp.Logger.Error("%s", err.Error())
+
+ xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
return nil, err
}
func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
fmt.Println("CRESTSubscriptionRequest")
+
+ if p.SubscriptionID != "" {
+ fmt.Println(" SubscriptionID = ", p.SubscriptionID)
+ } else {
+ fmt.Println(" SubscriptionID = ''")
+ }
+
fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
if p.ClientEndpoint.HTTPPort != nil {