+func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = trans.GetMtype()
+ params.SubId = int(subs.GetSubId())
+ params.Xid = ""
+ params.Meid = subs.GetMeid()
+ params.Src = ""
+ params.PayloadLen = len(trans.Payload.Buf)
+ params.Payload = trans.Payload.Buf
+ params.Mbuf = nil
+
+ return c.rmrSendRaw(desc, params)
+}
+
+func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = trans.GetMtype()
+ params.SubId = int(subs.GetSubId())
+ params.Xid = trans.GetXid()
+ params.Meid = trans.GetMeid()
+ params.Src = ""
+ params.PayloadLen = len(trans.Payload.Buf)
+ params.Payload = trans.Payload.Buf
+ params.Mbuf = nil
+
+ return c.rmrSendRaw(desc, params)
+}
+
+func (c *Control) Consume(params *xapp.RMRParams) (err error) {
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+ msg := &RMRParams{params}
+ c.msgCounter++
+ switch msg.Mtype {
+ case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+ go c.handleSubscriptionRequest(msg)
+ case xapp.RICMessageTypes["RIC_SUB_RESP"]:
+ go c.handleSubscriptionResponse(msg)
+ case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+ go c.handleSubscriptionFailure(msg)
+ case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
+ go c.handleSubscriptionDeleteRequest(msg)
+ case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
+ go c.handleSubscriptionDeleteResponse(msg)
+ case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
+ go c.handleSubscriptionDeleteFailure(msg)
+ default:
+ xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
+ }
+
+ return nil
+}
+
+func (c *Control) handleSubscriptionRequest(params *RMRParams) {
+ xapp.Logger.Info("SubReq from xapp: %s", params.String())
+
+ //
+ //
+ //
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+ params.Xid,
+ params.Meid,
+ false,
+ true)
+
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
+ return
+ }
+
+ //
+ //
+ //
+ trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
+ return
+ }
+
+ //
+ //
+ //
+ subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
+ return
+ }
+
+ err = subs.SetTransaction(trans)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+ subs.Release()
+ trans.Release()
+ return