registry *Registry
rtmgrClient *RtmgrClient
tracker *Tracker
+ rc_chan chan *xapp.RMRParams
}
type RMRMeid struct {
delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
rtmgrClient := RtmgrClient{client, handle, delete_handle}
- return Control{new(E2ap), registry, &rtmgrClient, tracker}
+ return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
}
func (c *Control) Run() {
+ go c.controlLoop()
xapp.Run(c)
}
func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
- switch rp.Mtype {
- case C.RIC_SUB_REQ:
- err = c.handleSubscriptionRequest(rp)
- case C.RIC_SUB_RESP:
- err = c.handleSubscriptionResponse(rp)
- case C.RIC_SUB_DEL_REQ:
- err = c.handleSubscriptionDeleteRequest(rp)
- case C.RIC_SUB_DEL_RESP:
- err = c.handleSubscriptionDeleteResponse(rp)
- default:
- err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
- }
+ c.rc_chan <- rp
return
}
return
}
+func (c *Control) controlLoop() {
+ for {
+ msg := <-c.rc_chan
+ switch msg.Mtype {
+ case C.RIC_SUB_REQ:
+ c.handleSubscriptionRequest(msg)
+ case C.RIC_SUB_RESP:
+ c.handleSubscriptionResponse(msg)
+ case C.RIC_SUB_DEL_REQ:
+ c.handleSubscriptionDeleteRequest(msg)
+ case C.RIC_SUB_DEL_RESP:
+ c.handleSubscriptionDeleteResponse(msg)
+ default:
+ err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
+ xapp.Logger.Error("Unknown message type: %v", err)
+ }
+ }
+}
+
func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
if err != nil {