+ xapp.Run(c)
+}
+
+func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
+ switch rp.Mtype {
+ case C.RIC_SUB_REQ:
+ err = c.handleSubscriptionRequest(rp)
+ case C.RIC_SUB_RESP:
+ err = c.handleSubscriptionResponse(rp)
+ case C.RIC_SUB_DEL_REQ:
+ err = c.handleSubscriptionDeleteRequest(rp)
+ default:
+ err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
+ }
+ return
+}
+
+func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
+ if !xapp.Rmr.Send(params, false) {
+ err = errors.New("rmr.Send() failed")
+ }
+ return
+}
+
+func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
+ payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
+ if err != nil {
+ err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+ return
+ }
+ xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+
+ /* Reserve a sequence number and set it in the payload */
+ new_sub_id := c.registry.ReserveSequenceNumber()
+
+ _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
+ if err != nil {
+ err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
+ return
+ }
+
+ src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
+ if err != nil {
+ xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
+ return
+ }
+
+ /* Create transatcion records for every subscription request */
+ xact_key := Transaction_key{new_sub_id, CREATE}
+ xact_value := Transaction{*src_addr, *src_port, params.Payload}
+ err = c.tracker.Track_transaction(xact_key, xact_value)
+ if err != nil {
+ xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+ return
+ }
+
+ /* Update routing manager about the new subscription*/
+ sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
+ go c.rtmgrClient.SubscriptionRequestUpdate()
+ SubscriptionReqChan <- sub_route_action
+
+ // Setting new subscription ID in the RMR header
+ params.SubId = int(new_sub_id)
+
+ xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
+ c.rmrSend(params)
+ return