X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=E2Manager%2Fservices%2Frmr_service.go;h=dc35dfbef9d7e74831baf40808089b0540b6276b;hb=refs%2Fchanges%2F85%2F885%2F1;hp=5a8be8b3d5695e31885429eb8cd200d7dff69fdf;hpb=07ef76dd471a0892a893c90e0ab06713aee34be1;p=ric-plt%2Fe2mgr.git diff --git a/E2Manager/services/rmr_service.go b/E2Manager/services/rmr_service.go index 5a8be8b..dc35dfb 100644 --- a/E2Manager/services/rmr_service.go +++ b/E2Manager/services/rmr_service.go @@ -19,7 +19,6 @@ package services import ( "e2mgr/logger" - "e2mgr/managers" "e2mgr/models" "e2mgr/rmrCgo" "e2mgr/sessions" @@ -27,7 +26,6 @@ import ( "sync" ) - type RmrConfig struct { Port int MaxMsgSize int @@ -41,23 +39,20 @@ func NewRmrConfig(port int, maxMsgSize int, flags int, logger *logger.Logger) *R // RmrService holds an instance of RMR messenger as well as its configuration type RmrService struct { - config *RmrConfig - messenger *rmrCgo.RmrMessenger - e2sessions sessions.E2Sessions - nManager *managers.NotificationManager - rmrResponse chan *models.NotificationResponse + Config *RmrConfig + Messenger *rmrCgo.RmrMessenger + E2sessions sessions.E2Sessions + RmrResponse chan *models.NotificationResponse } // NewRmrService instantiates a new Rmr service instance -func NewRmrService(rmrConfig *RmrConfig, msrImpl rmrCgo.RmrMessenger, e2sessions sessions.E2Sessions, nManager *managers.NotificationManager, - rmrResponse chan *models.NotificationResponse) *RmrService { +func NewRmrService(rmrConfig *RmrConfig, msrImpl rmrCgo.RmrMessenger, e2sessions sessions.E2Sessions, rmrResponse chan *models.NotificationResponse) *RmrService { return &RmrService{ - config: rmrConfig, - messenger: msrImpl.Init("tcp:"+strconv.Itoa(rmrConfig.Port), rmrConfig.MaxMsgSize, rmrConfig.Flags, rmrConfig.Logger), - e2sessions: e2sessions, - nManager: nManager, - rmrResponse: rmrResponse, + Config: rmrConfig, + Messenger: msrImpl.Init("tcp:"+strconv.Itoa(rmrConfig.Port), rmrConfig.MaxMsgSize, rmrConfig.Flags, rmrConfig.Logger), + E2sessions: e2sessions, + RmrResponse: rmrResponse, } } @@ -66,72 +61,55 @@ func (r *RmrService) SendMessage(messageType int, messageChannel chan *models.E2 wg.Add(1) setupRequestMessage := <-messageChannel - e2Message := setupRequestMessage.GetMessageAsBytes(r.config.Logger) + e2Message := setupRequestMessage.GetMessageAsBytes(r.Config.Logger) transactionId := []byte(setupRequestMessage.TransactionId()) - msg := rmrCgo.NewMBuf(messageType, len(e2Message)/*r.config.MaxMsgSize*/, setupRequestMessage.RanName(), &e2Message, &transactionId) + msg := rmrCgo.NewMBuf(messageType, len(e2Message) /*r.config.MaxMsgSize*/, setupRequestMessage.RanName(), &e2Message, &transactionId) - r.config.Logger.Debugf("#rmr_service.SendMessage - Going to send the message: %#v\n", msg) - _, err := (*r.messenger).SendMsg(msg, r.config.MaxMsgSize) + r.Config.Logger.Debugf("#rmr_service.SendMessage - Going to send the message: %#v\n", msg) + _, err := (*r.Messenger).SendMsg(msg, r.Config.MaxMsgSize) errorChannel <- err wg.Done() } -func (r *RmrService) SendRmrMessage(response *models.NotificationResponse) { +func (r *RmrService) SendRmrMessage(response *models.NotificationResponse) error { - msgAsBytes := response.GetMessageAsBytes(r.config.Logger) + msgAsBytes := response.GetMessageAsBytes(r.Config.Logger) transactionIdByteArr := []byte(response.RanName) msg := rmrCgo.NewMBuf(response.MgsType, len(msgAsBytes), response.RanName, &msgAsBytes, &transactionIdByteArr) - r.config.Logger.Debugf("#rmr_service.SendRmrMessage - Going to send the message: %#v\n", msg) - - _, err := (*r.messenger).SendMsg(msg, r.config.MaxMsgSize) + _, err := (*r.Messenger).SendMsg(msg, r.Config.MaxMsgSize) if err != nil { - r.config.Logger.Errorf("#rmr_service.SendRmrMessage - error: %#v\n", err) - return + return err } + return nil } -// ListenAndHandle waits for messages coming from rmr_rcv_msg and sends it to a designated message handler -func (r *RmrService) ListenAndHandle() { - +func (r *RmrService) SendResponse() { for { - mbuf, err := (*r.messenger).RecvMsg() - r.config.Logger.Debugf("#rmr_service.ListenAndHandle - Going to handle received message: %#v\n", mbuf) - // TODO: one mbuf received immediately execute goroutine - if err != nil { - continue //TODO log error - } - - r.nManager.HandleMessage(r.config.Logger, r.e2sessions, mbuf, r.rmrResponse) - } -} - -func (r *RmrService) SendResponse(){ - for{ - - response, ok := <-r.rmrResponse + response, ok := <-r.RmrResponse if !ok { - r.config.Logger.Errorf("#rmr_service.SendResponse - channel closed") + r.Config.Logger.Errorf("#rmr_service.SendResponse - channel closed") break } - r.config.Logger.Debugf("#rmr_service.SendResponse - Going to send message: %#v\n", response) - r.SendRmrMessage(response) + r.Config.Logger.Debugf("#rmr_service.SendResponse - Going to send message: %#v\n", response) + if err := r.SendRmrMessage(response); err != nil { + r.Config.Logger.Errorf("#rmr_service.SendResponse - error: %#v\n", err) + } } } func (r *RmrService) CloseContext() { - if r.config.Logger.DebugEnabled(){ - r.config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.messenger).IsReady()) - (*r.messenger).Close() - r.config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.messenger).IsReady()) + if r.Config.Logger.DebugEnabled() { + r.Config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.Messenger).IsReady()) + (*r.Messenger).Close() + r.Config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.Messenger).IsReady()) } } -