- // Setting new subscription ID in the RMR header
- params.SubId = int(newSubId)
- xapp.Logger.Info("Forwarding Subscription Request to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid)
- err = c.rmrSend(params)
- if err != nil {
- xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- } /*else {
- c.timerMap.StartTimer(newSubId, subReqTime, c.handleSubscriptionRequestTimer)
- }*/
- xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
+func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
+
+ params := &xapp.RMRParams{}
+ params.Mtype = trans.GetMtype()
+ params.SubId = int(subs.GetReqId().InstanceId)
+ params.Xid = trans.GetXid()
+ params.Meid = trans.GetMeid()
+ params.Src = ""
+ params.PayloadLen = len(trans.Payload.Buf)
+ params.Payload = trans.Payload.Buf
+ params.Mbuf = nil
+ xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
+ return c.SendWithRetry(params, false, 5)
+}
+
+func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+ if c.RMRClient == nil {
+ err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
+ xapp.Logger.Error("%s", err.Error())
+ return
+ }
+ c.CntRecvMsg++
+
+ defer c.RMRClient.Free(msg.Mbuf)
+
+ // xapp-frame might use direct access to c buffer and
+ // when msg.Mbuf is freed, someone might take it into use
+ // and payload data might be invalid inside message handle function
+ //
+ // subscriptions won't load system a lot so there is no
+ // real performance hit by cloning buffer into new go byte slice
+ cPay := append(msg.Payload[:0:0], msg.Payload...)
+ msg.Payload = cPay
+ msg.PayloadLen = len(cPay)
+
+ switch msg.Mtype {
+ case xapp.RIC_SUB_REQ:
+ go c.handleXAPPSubscriptionRequest(msg)
+ case xapp.RIC_SUB_RESP:
+ go c.handleE2TSubscriptionResponse(msg)
+ case xapp.RIC_SUB_FAILURE:
+ go c.handleE2TSubscriptionFailure(msg)
+ case xapp.RIC_SUB_DEL_REQ:
+ go c.handleXAPPSubscriptionDeleteRequest(msg)
+ case xapp.RIC_SUB_DEL_RESP:
+ go c.handleE2TSubscriptionDeleteResponse(msg)
+ case xapp.RIC_SUB_DEL_FAILURE:
+ go c.handleE2TSubscriptionDeleteFailure(msg)
+ default:
+ xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
+ }