X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=E2Manager%2FrmrCgo%2FrmrCgoApi.go;h=4a9f8c4fde07b15159384c285898e192259c68b7;hb=b4b908e9d15ec94c7d3c785f32105f20e314af4d;hp=2de57d9d871ef7b273807c55f7ac2cf4644f4e33;hpb=07ef76dd471a0892a893c90e0ab06713aee34be1;p=ric-plt%2Fe2mgr.git diff --git a/E2Manager/rmrCgo/rmrCgoApi.go b/E2Manager/rmrCgo/rmrCgoApi.go index 2de57d9..4a9f8c4 100644 --- a/E2Manager/rmrCgo/rmrCgoApi.go +++ b/E2Manager/rmrCgo/rmrCgoApi.go @@ -31,7 +31,7 @@ import ( "e2mgr/logger" ) -func (*Context) Init(port string, maxMsgSize int, flags int, logger *logger.Logger) *RmrMessenger {//TODO remove pointer from interface +func (*Context) Init(port string, maxMsgSize int, flags int, logger *logger.Logger) RmrMessenger { pp := C.CString(port) defer C.free(unsafe.Pointer(pp)) logger.Debugf("#rmrCgoApi.Init - Going to initiate RMR router") @@ -46,19 +46,22 @@ func (*Context) Init(port string, maxMsgSize int, flags int, logger *logger.Logg } } logger.Infof("#rmrCgoApi.Init - RMR router has been initiated") + + // Configure the rmr to make rounds of attempts to send a message before notifying the application that it should retry. + // Each round is about 1000 attempts with a short sleep between each round. + C.rmr_set_stimeout(ctx.RmrCtx, C.int(1000)) r := RmrMessenger(ctx) - return &r + return r } -func (ctx *Context) SendMsg(msg *MBuf, maxMsgSize int) (*MBuf, error) { +func (ctx *Context) SendMsg(msg *MBuf) (*MBuf, error) { ctx.checkContextInitialized() ctx.Logger.Debugf("#rmrCgoApi.SendMsg - Going to send message. MBuf: %v", *msg) - allocatedCMBuf := ctx.getAllocatedCRmrMBuf(ctx.Logger, msg, maxMsgSize) + allocatedCMBuf := ctx.getAllocatedCRmrMBuf(ctx.Logger, msg, ctx.MaxMsgSize) defer C.rmr_free_msg(allocatedCMBuf) state := allocatedCMBuf.state if state != RMR_OK { errorMessage := fmt.Sprintf("#rmrCgoApi.SendMsg - Failed to get allocated message. state: %v - %s", state, states[int(state)]) - ctx.Logger.Errorf(errorMessage) return nil, errors.New(errorMessage) } @@ -73,7 +76,6 @@ func (ctx *Context) SendMsg(msg *MBuf, maxMsgSize int) (*MBuf, error) { if state != RMR_OK { errorMessage := fmt.Sprintf("#rmrCgoApi.SendMsg - Failed to send message. state: %v - %s", state, states[int(state)]) - ctx.Logger.Errorf(errorMessage) return nil, errors.New(errorMessage) } @@ -103,14 +105,6 @@ func (ctx *Context) RecvMsg() (*MBuf, error) { return mbuf, nil } -func (ctx *Context) RtsMsg(msg *MBuf) { - ctx.checkContextInitialized() - ctx.Logger.Debugf("#rmrCgoApi.RtsMsg - Going to return message to the sender") - allocatedCMBuf := C.rmr_alloc_msg(ctx.RmrCtx, C.int(ctx.MaxMsgSize)) - defer C.rmr_free_msg(allocatedCMBuf) - C.rmr_rts_msg(ctx.RmrCtx, allocatedCMBuf) -} - func (ctx *Context) IsReady() bool { ctx.Logger.Debugf("#rmrCgoApi.IsReady - Going to check if routing table is initialized") return int(C.rmr_ready(ctx.RmrCtx)) != 0