+//-------------------------------------------------------------------
+//
+//------------------------------------------------------------------
+func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
+ restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
+
+ err := c.tracker.Track(trans)
+ if err != nil {
+ err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return nil, err
+ }
+
+ subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
+ if err != nil {
+ err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return nil, err
+ }
+
+ //
+ // Wake subs request
+ //
+ go c.handleSubscriptionCreate(subs, trans)
+ event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+ err = nil
+ if event != nil {
+ switch themsg := event.(type) {
+ case *e2ap.E2APSubscriptionResponse:
+ trans.Release()
+ return themsg, nil
+ case *e2ap.E2APSubscriptionFailure:
+ err = fmt.Errorf("SubscriptionFailure received")
+ return nil, err
+ default:
+ break
+ }
+ }
+ err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
+ xapp.Logger.Error("%s", err.Error())
+ c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
+ return nil, err
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
+
+ c.CntRecvMsg++
+ c.UpdateCounter(cRestSubDelReqFromXapp)
+
+ xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
+
+ restSubscription, err := c.registry.GetRESTSubscription(restSubId)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ if restSubscription == nil {
+ // Subscription was not found
+ return nil
+ } else {
+ if restSubscription.SubReqOngoing == true {
+ err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
+ xapp.Logger.Error("%s", err.Error())
+ return err
+ } else if restSubscription.SubDelReqOngoing == true {
+ // Previous request for same restSubId still ongoing
+ return nil
+ }
+ }
+ }
+
+ xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
+ go func() {
+ for _, instanceId := range restSubscription.InstanceIds {
+ err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ //return err
+ }
+ xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
+ restSubscription.DeleteInstanceId(instanceId)
+ }
+ c.registry.DeleteRESTSubscription(&restSubId)
+ }()
+
+ c.UpdateCounter(cRestSubDelRespToXapp)
+
+ return nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
+
+ xid := *restSubId + "_" + strconv.FormatUint(uint64(instanceId), 10)
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), xid, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
+ //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
+ if trans == nil {
+ err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
+ xapp.Logger.Error("%s", err.Error())
+ }
+ defer trans.Release()
+
+ err := c.tracker.Track(trans)
+ if err != nil {
+ err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return &time.ParseError{}
+ }
+
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
+ if err != nil {
+ err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
+ xapp.Logger.Error("%s", err.Error())
+ return err
+ }
+ //
+ // Wake subs delete
+ //
+ go c.handleSubscriptionDelete(subs, trans)
+ trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+ xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
+
+ c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
+
+ return nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) QueryHandler() (models.SubscriptionList, error) {
+ xapp.Logger.Info("QueryHandler() called")
+
+ c.CntRecvMsg++
+
+ return c.registry.QueryHandler()
+}
+
+func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
+ xapp.Logger.Info("TestRestHandler() called")
+
+ pathParams := mux.Vars(r)
+ s := pathParams["testId"]
+
+ // This can be used to delete single subscription from db
+ if contains := strings.Contains(s, "deletesubid="); contains == true {
+ var splits = strings.Split(s, "=")
+ if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
+ xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
+ c.RemoveSubscriptionFromSdl(uint32(subId))
+ return
+ }
+ }
+
+ // This can be used to remove all subscriptions db from
+ if s == "emptydb" {
+ xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
+ c.RemoveAllSubscriptionsFromSdl()
+ return
+ }
+
+ // This is meant to cause submgr's restart in testing
+ if s == "restart" {
+ xapp.Logger.Info("os.Exit(1) called")
+ os.Exit(1)
+ }
+
+ xapp.Logger.Info("Unsupported rest command received %s", s)
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
+func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
+ params := &xapp.RMRParams{}
+ params.Mtype = trans.GetMtype()
+ params.SubId = int(subs.GetReqId().InstanceId)
+ params.Xid = ""
+ params.Meid = subs.GetMeid()
+ params.Src = ""
+ params.PayloadLen = len(trans.Payload.Buf)
+ params.Payload = trans.Payload.Buf
+ params.Mbuf = nil
+ xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
+ err = c.SendWithRetry(params, false, 5)
+ if err != nil {
+ xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
+ }
+ return err
+}
+
+func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
+
+ params := &xapp.RMRParams{}
+ params.Mtype = trans.GetMtype()
+ params.SubId = int(subs.GetReqId().InstanceId)
+ params.Xid = trans.GetXid()
+ params.Meid = trans.GetMeid()
+ params.Src = ""
+ params.PayloadLen = len(trans.Payload.Buf)
+ params.Payload = trans.Payload.Buf
+ params.Mbuf = nil
+ xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
+ err = c.SendWithRetry(params, false, 5)
+ if err != nil {
+ xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
+ }
+ return err
+}
+
+func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+ if c.RMRClient == nil {
+ err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
+ xapp.Logger.Error("%s", err.Error())
+ return
+ }
+ c.CntRecvMsg++
+
+ defer c.RMRClient.Free(msg.Mbuf)
+
+ // xapp-frame might use direct access to c buffer and
+ // when msg.Mbuf is freed, someone might take it into use
+ // and payload data might be invalid inside message handle function
+ //
+ // subscriptions won't load system a lot so there is no
+ // real performance hit by cloning buffer into new go byte slice
+ cPay := append(msg.Payload[:0:0], msg.Payload...)
+ msg.Payload = cPay
+ msg.PayloadLen = len(cPay)
+
+ switch msg.Mtype {
+ case xapp.RIC_SUB_REQ:
+ go c.handleXAPPSubscriptionRequest(msg)
+ case xapp.RIC_SUB_RESP:
+ go c.handleE2TSubscriptionResponse(msg)
+ case xapp.RIC_SUB_FAILURE:
+ go c.handleE2TSubscriptionFailure(msg)
+ case xapp.RIC_SUB_DEL_REQ:
+ go c.handleXAPPSubscriptionDeleteRequest(msg)
+ case xapp.RIC_SUB_DEL_RESP:
+ go c.handleE2TSubscriptionDeleteResponse(msg)
+ case xapp.RIC_SUB_DEL_FAILURE:
+ go c.handleE2TSubscriptionDeleteFailure(msg)
+ default:
+ xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)