+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
+
+ var restSubId string
+ var restSubscription *RESTSubscription
+ var err error
+
+ prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+ if p.SubscriptionID == "" {
+ if exists {
+ restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
+ if restSubscription != nil {
+ restSubId = prevRestSubsId
+ if err == nil {
+ xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
+ } else {
+ xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
+ }
+ } else {
+ xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
+ restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+ }
+ }
+
+ if restSubscription == nil {
+ restSubId = ksuid.New().String()
+ restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ c.UpdateCounter(cRestSubFailToXapp)
+ return nil, "", err
+ }
+ }
+ } else {
+ restSubId = p.SubscriptionID
+
+ xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
+
+ restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ c.UpdateCounter(cRestSubFailToXapp)
+ return nil, "", err
+ }
+
+ if !exists {
+ xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
+ } else {
+ xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
+ }
+ }
+
+ return restSubscription, restSubId, nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
+
+ c.CntRecvMsg++
+ c.UpdateCounter(cRestSubReqFromXapp)
+
+ subResp := models.SubscriptionResponse{}
+ p := params.(*models.SubscriptionParams)
+
+ if c.LoggerLevel > 2 {
+ c.PrintRESTSubscriptionRequest(p)
+ }
+
+ if p.ClientEndpoint == nil {
+ xapp.Logger.Error("ClientEndpoint == nil")
+ c.UpdateCounter(cRestSubFailToXapp)
+ return nil, fmt.Errorf("")
+ }
+
+ _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ c.UpdateCounter(cRestSubFailToXapp)
+ return nil, err
+ }
+
+ md5sum, err := CalculateRequestMd5sum(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
+ }
+
+ restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
+ if err != nil {
+ xapp.Logger.Error("Failed to get/allocate REST subscription")
+ return nil, err
+ }
+
+ subResp.SubscriptionID = &restSubId
+ subReqList := e2ap.SubscriptionRequestList{}
+ err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+ c.registry.DeleteRESTSubscription(&restSubId)
+ c.UpdateCounter(cRestSubFailToXapp)
+ return nil, err
+ }
+
+ duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
+ if duplicate {
+ xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
+ c.UpdateCounter(cRestSubRespToXapp)
+ return &subResp, nil
+ }
+
+ c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
+
+ go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
+
+ c.UpdateCounter(cRestSubRespToXapp)
+ return &subResp, nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
+func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
+ clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
+
+ xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
+
+ var xAppEventInstanceID int64
+ var e2EventInstanceID int64
+
+ defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
+
+ for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
+ subReqMsg := subReqList.E2APSubscriptionRequests[index]
+ xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
+
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
+ if trans == nil {
+ // Send notification to xApp that prosessing of a Subscription Request has failed.
+ err := fmt.Errorf("Tracking failure")
+ c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
+ continue
+ }
+
+ xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+
+ subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
+
+ xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+
+ if err != nil {
+ c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
+ } else {
+ e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
+ restSubscription.AddMd5Sum(md5sum)
+ xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+ index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
+ c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
+ }
+ trans.Release()
+ }
+}
+
+//-------------------------------------------------------------------
+//
+//------------------------------------------------------------------
+func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
+ restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
+
+ err := c.tracker.Track(trans)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
+ err = fmt.Errorf("Tracking failure")
+ return nil, err
+ }
+
+ subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
+ return nil, err
+ }
+
+ //
+ // Wake subs request
+ //
+ go c.handleSubscriptionCreate(subs, trans)
+ event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side