import (
"e2mgr/logger"
- "e2mgr/managers"
"e2mgr/models"
"e2mgr/rmrCgo"
"e2mgr/sessions"
"sync"
)
-
type RmrConfig struct {
Port int
MaxMsgSize int
// RmrService holds an instance of RMR messenger as well as its configuration
type RmrService struct {
- config *RmrConfig
- messenger *rmrCgo.RmrMessenger
- e2sessions sessions.E2Sessions
- nManager *managers.NotificationManager
- rmrResponse chan *models.NotificationResponse
+ Config *RmrConfig
+ Messenger *rmrCgo.RmrMessenger
+ E2sessions sessions.E2Sessions
+ RmrResponse chan *models.NotificationResponse
}
// NewRmrService instantiates a new Rmr service instance
-func NewRmrService(rmrConfig *RmrConfig, msrImpl rmrCgo.RmrMessenger, e2sessions sessions.E2Sessions, nManager *managers.NotificationManager,
- rmrResponse chan *models.NotificationResponse) *RmrService {
+func NewRmrService(rmrConfig *RmrConfig, msrImpl rmrCgo.RmrMessenger, e2sessions sessions.E2Sessions, rmrResponse chan *models.NotificationResponse) *RmrService {
return &RmrService{
- config: rmrConfig,
- messenger: msrImpl.Init("tcp:"+strconv.Itoa(rmrConfig.Port), rmrConfig.MaxMsgSize, rmrConfig.Flags, rmrConfig.Logger),
- e2sessions: e2sessions,
- nManager: nManager,
- rmrResponse: rmrResponse,
+ Config: rmrConfig,
+ Messenger: msrImpl.Init("tcp:"+strconv.Itoa(rmrConfig.Port), rmrConfig.MaxMsgSize, rmrConfig.Flags, rmrConfig.Logger),
+ E2sessions: e2sessions,
+ RmrResponse: rmrResponse,
}
}
wg.Add(1)
setupRequestMessage := <-messageChannel
- e2Message := setupRequestMessage.GetMessageAsBytes(r.config.Logger)
+ e2Message := setupRequestMessage.GetMessageAsBytes(r.Config.Logger)
transactionId := []byte(setupRequestMessage.TransactionId())
- msg := rmrCgo.NewMBuf(messageType, len(e2Message)/*r.config.MaxMsgSize*/, setupRequestMessage.RanName(), &e2Message, &transactionId)
+ msg := rmrCgo.NewMBuf(messageType, len(e2Message) /*r.config.MaxMsgSize*/, setupRequestMessage.RanName(), &e2Message, &transactionId)
- r.config.Logger.Debugf("#rmr_service.SendMessage - Going to send the message: %#v\n", msg)
- _, err := (*r.messenger).SendMsg(msg, r.config.MaxMsgSize)
+ r.Config.Logger.Debugf("#rmr_service.SendMessage - Going to send the message: %#v\n", msg)
+ _, err := (*r.Messenger).SendMsg(msg, r.Config.MaxMsgSize)
errorChannel <- err
wg.Done()
}
-func (r *RmrService) SendRmrMessage(response *models.NotificationResponse) {
+func (r *RmrService) SendRmrMessage(response *models.NotificationResponse) error {
- msgAsBytes := response.GetMessageAsBytes(r.config.Logger)
+ msgAsBytes := response.GetMessageAsBytes(r.Config.Logger)
transactionIdByteArr := []byte(response.RanName)
msg := rmrCgo.NewMBuf(response.MgsType, len(msgAsBytes), response.RanName, &msgAsBytes, &transactionIdByteArr)
- r.config.Logger.Debugf("#rmr_service.SendRmrMessage - Going to send the message: %#v\n", msg)
-
- _, err := (*r.messenger).SendMsg(msg, r.config.MaxMsgSize)
+ _, err := (*r.Messenger).SendMsg(msg, r.Config.MaxMsgSize)
if err != nil {
- r.config.Logger.Errorf("#rmr_service.SendRmrMessage - error: %#v\n", err)
- return
+ return err
}
+ return nil
}
-// ListenAndHandle waits for messages coming from rmr_rcv_msg and sends it to a designated message handler
-func (r *RmrService) ListenAndHandle() {
-
+func (r *RmrService) SendResponse() {
for {
- mbuf, err := (*r.messenger).RecvMsg()
- r.config.Logger.Debugf("#rmr_service.ListenAndHandle - Going to handle received message: %#v\n", mbuf)
- // TODO: one mbuf received immediately execute goroutine
- if err != nil {
- continue //TODO log error
- }
-
- r.nManager.HandleMessage(r.config.Logger, r.e2sessions, mbuf, r.rmrResponse)
- }
-}
-
-func (r *RmrService) SendResponse(){
- for{
-
- response, ok := <-r.rmrResponse
+ response, ok := <-r.RmrResponse
if !ok {
- r.config.Logger.Errorf("#rmr_service.SendResponse - channel closed")
+ r.Config.Logger.Errorf("#rmr_service.SendResponse - channel closed")
break
}
- r.config.Logger.Debugf("#rmr_service.SendResponse - Going to send message: %#v\n", response)
- r.SendRmrMessage(response)
+ r.Config.Logger.Debugf("#rmr_service.SendResponse - Going to send message: %#v\n", response)
+ if err := r.SendRmrMessage(response); err != nil {
+ r.Config.Logger.Errorf("#rmr_service.SendResponse - error: %#v\n", err)
+ }
}
}
func (r *RmrService) CloseContext() {
- if r.config.Logger.DebugEnabled(){
- r.config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.messenger).IsReady())
- (*r.messenger).Close()
- r.config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.messenger).IsReady())
+ if r.Config.Logger.DebugEnabled() {
+ r.Config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.Messenger).IsReady())
+ (*r.Messenger).Close()
+ r.Config.Logger.Debugf("#rmr_service.CloseContext - RMR is ready: %v", (*r.Messenger).IsReady())
}
}
-