+func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = trans.GetMtype()
+ params.SubId = int(subs.GetSubId())
+ params.Xid = ""
+ params.Meid = subs.GetMeid()
+ params.Src = ""
+ params.PayloadLen = payloadLen
+ params.Payload = payload
+ params.Mbuf = nil
+
+ return c.rmrSendRaw(desc, params)
+}
+
+func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = mType
+ params.SubId = int(subs.GetSubId())
+ params.Xid = trans.GetXid()
+ params.Meid = trans.GetMeid()
+ params.Src = ""
+ params.PayloadLen = payloadLen
+ params.Payload = payload
+ params.Mbuf = nil
+
+ return c.rmrSendRaw(desc, params)
+}
+
+func (c *Control) Consume(params *xapp.RMRParams) (err error) {
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+ msg := &RMRParams{params}
+ c.msgCounter++
+ switch msg.Mtype {
+ case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+ go c.handleSubscriptionRequest(msg)
+ case xapp.RICMessageTypes["RIC_SUB_RESP"]:
+ go c.handleSubscriptionResponse(msg)
+ case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+ go c.handleSubscriptionFailure(msg)
+ case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
+ go c.handleSubscriptionDeleteRequest(msg)
+ case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
+ go c.handleSubscriptionDeleteResponse(msg)
+ case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
+ go c.handleSubscriptionDeleteFailure(msg)
+ default:
+ xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
+ }
+
+ return nil
+}
+
+func (c *Control) handleSubscriptionRequest(params *RMRParams) {
+ xapp.Logger.Info("SubReq from xapp: %s", params.String())
+
+ //
+ //
+ //
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+ params.Mtype,
+ params.Xid,
+ params.Meid,
+ false,
+ true)
+
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
+ return
+ }
+
+ //
+ //
+ //
+ trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
+ return
+ }
+
+ //
+ //
+ //
+ subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
+ return
+ }
+
+ err = subs.SetTransaction(trans)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+ subs.Release()
+ trans.Release()
+ return
+ }
+
+ trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+
+ //
+ // TODO: subscription create is in fact owned by subscription and not transaction.
+ // Transaction is toward xapp while Subscription is toward ran.
+ // In merge several xapps may wake transactions, while only one subscription
+ // toward ran occurs -> subscription owns subscription creation toward ran
+ //
+ // This is intermediate solution while improving message handling
+ //
+ packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
+ subs.Release()
+ trans.Release()
+ return