submgr 0.10.3 published
[ric-plt/submgr.git] / pkg / control / control.go
index ec6419e..291075d 100644 (file)
@@ -46,6 +46,7 @@ type Control struct {
        registry    *Registry
        rtmgrClient *RtmgrClient
        tracker     *Tracker
+       rc_chan     chan *xapp.RMRParams
 }
 
 type RMRMeid struct {
@@ -90,26 +91,16 @@ func NewControl() Control {
        delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
        rtmgrClient := RtmgrClient{client, handle, delete_handle}
 
-       return Control{new(E2ap), registry, &rtmgrClient, tracker}
+       return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
 }
 
 func (c *Control) Run() {
+       go c.controlLoop()
        xapp.Run(c)
 }
 
 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
-       switch rp.Mtype {
-       case C.RIC_SUB_REQ:
-               err = c.handleSubscriptionRequest(rp)
-       case C.RIC_SUB_RESP:
-               err = c.handleSubscriptionResponse(rp)
-       case C.RIC_SUB_DEL_REQ:
-               err = c.handleSubscriptionDeleteRequest(rp)
-       case C.RIC_SUB_DEL_RESP:
-               err = c.handleSubscriptionDeleteResponse(rp)
-       default:
-               err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
-       }
+       c.rc_chan <- rp
        return
 }
 
@@ -127,6 +118,25 @@ func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
        return
 }
 
+func (c *Control) controlLoop() {
+       for {
+               msg := <-c.rc_chan
+               switch msg.Mtype {
+                       case C.RIC_SUB_REQ:
+                               c.handleSubscriptionRequest(msg)
+                       case C.RIC_SUB_RESP:
+                               c.handleSubscriptionResponse(msg)
+                       case C.RIC_SUB_DEL_REQ:
+                               c.handleSubscriptionDeleteRequest(msg)
+                       case C.RIC_SUB_DEL_RESP:
+                               c.handleSubscriptionDeleteResponse(msg)
+                       default:
+                               err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
+                               xapp.Logger.Error("Unknown message type: %v", err)
+               }
+       }
+}
+
 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
        payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
        if err != nil {