+ xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
+ }
+
+ return nil
+}
+func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
+ var retval string = ""
+ var filler string = ""
+ if trans != nil {
+ retval += filler + trans.String()
+ filler = " "
+ }
+ if subs != nil {
+ retval += filler + subs.String()
+ filler = " "
+ }
+ if err != nil {
+ retval += filler + "err(" + err.Error() + ")"
+ filler = " "
+
+ }
+ return retval
+}
+
+//-------------------------------------------------------------------
+// handle from XAPP Subscription Request
+//------------------------------------------------------------------
+func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
+ xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
+
+ subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
+ return
+ }
+
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
+ return
+ }
+ defer trans.Release()
+
+ subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
+ return
+ }
+
+ if subs.IsTransactionReserved() {
+ err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, subs, err))
+ return
+ }
+
+ //
+ // Wake subs request
+ //
+ go c.handleSubscriptionCreate(subs, trans)
+ event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+ err = nil
+ if event != nil {
+ switch themsg := event.(type) {
+ case *e2ap.E2APSubscriptionResponse:
+ trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
+ if err == nil {
+ c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
+ return
+ }
+ case *e2ap.E2APSubscriptionFailure:
+ trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
+ if err == nil {
+ c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
+ }
+ return
+ default:
+ break
+ }
+ }
+ xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
+}
+
+//-------------------------------------------------------------------
+// handle from XAPP Subscription Delete Request
+//------------------------------------------------------------------
+func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
+ xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
+
+ subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
+ return
+ }
+
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
+ return
+ }
+ defer trans.Release()
+
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
+ return
+ }
+
+ if subs.IsTransactionReserved() {
+ err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
+ xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, subs, err))
+ return
+ }
+
+ //
+ // Wake subs delete
+ //
+ go c.handleSubscriptionDelete(subs, trans)
+ trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+ // Whatever is received send ok delete response
+ subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
+ subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
+ subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
+ subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
+ trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
+ if err == nil {
+ c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
+ }
+}
+
+//-------------------------------------------------------------------
+// SUBS CREATE Handling
+//-------------------------------------------------------------------
+func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
+
+ trans := c.tracker.NewTransaction(subs.GetMeid())
+ subs.WaitTransactionTurn(trans)
+ defer subs.ReleaseTransactionTurn(trans)
+ defer trans.Release()
+
+ xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+
+ if subs.SubRespMsg != nil {
+ xapp.Logger.Debug("SUBS-SubReq: Handling (immediate response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
+ parentTrans.SendEvent(subs.SubRespMsg, 0)
+ return
+ }
+
+ event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
+ switch themsg := event.(type) {
+ case *e2ap.E2APSubscriptionResponse:
+ subs.SubRespMsg = themsg
+ parentTrans.SendEvent(event, 0)
+ return
+ case *e2ap.E2APSubscriptionFailure:
+ //TODO: Possible delete and one retry for subs req
+ parentTrans.SendEvent(event, 0)
+ default:
+ xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
+ c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ parentTrans.SendEvent(nil, 0)
+ }
+
+ go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+}
+
+//-------------------------------------------------------------------
+// SUBS DELETE Handling
+//-------------------------------------------------------------------
+func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
+
+ trans := c.tracker.NewTransaction(subs.GetMeid())
+ subs.WaitTransactionTurn(trans)
+ defer subs.ReleaseTransactionTurn(trans)
+ defer trans.Release()
+
+ xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+
+ event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+
+ parentTrans.SendEvent(event, 0)
+ go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+}
+
+//-------------------------------------------------------------------
+// send to E2T Subscription Request
+//-------------------------------------------------------------------
+func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+ var err error
+ var event interface{} = nil
+ var timedOut bool = false
+
+ subReqMsg := subs.SubReqMsg
+ subReqMsg.RequestId.Id = 123
+ subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+ trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+ return event
+ }
+
+ for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
+ desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
+ c.rmrSend(desc, subs, trans)
+ event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
+ if timedOut {
+ continue
+ }
+ break
+ }
+ xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+ return event
+}
+
+//-------------------------------------------------------------------
+// send to E2T Subscription Delete Request
+//-------------------------------------------------------------------
+
+func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+ var err error
+ var event interface{}
+ var timedOut bool
+
+ subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
+ subDelReqMsg.RequestId.Id = 123
+ subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+ subDelReqMsg.FunctionId = 0
+ trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+ return event
+ }
+
+ for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
+ desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
+ c.rmrSend(desc, subs, trans)
+ event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
+ if timedOut {
+ continue
+ }
+ break
+ }
+ xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+ return event
+}
+
+//-------------------------------------------------------------------
+// handle from E2T Subscription Reponse
+//-------------------------------------------------------------------
+func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
+ xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
+ subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
+ if err != nil {
+ xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
+ return
+ }
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
+ if err != nil {
+ xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
+ return
+ }
+ trans := subs.GetTransaction()
+ if trans == nil {
+ err = fmt.Errorf("Ongoing transaction not found")
+ xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
+ return
+ }
+ sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
+ if sendOk == false {
+ err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
+ xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))