X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=E2Manager%2FrmrCgo%2FrmrCgoApi.go;h=5ec09b984d1809a3e9b25b9d019871e1eceff073;hb=a561a01554d9f29181595f6f9696097c2e98afc3;hp=f65d10dfbfae1470daf31ceb66091a9c9e44faa9;hpb=efcb4528362460aa2249d319c9752b63bb720fe2;p=ric-plt%2Fe2mgr.git diff --git a/E2Manager/rmrCgo/rmrCgoApi.go b/E2Manager/rmrCgo/rmrCgoApi.go index f65d10d..5ec09b9 100644 --- a/E2Manager/rmrCgo/rmrCgoApi.go +++ b/E2Manager/rmrCgo/rmrCgoApi.go @@ -20,7 +20,7 @@ package rmrCgo -// #cgo LDFLAGS: -L/usr/local/lib -lrmr_nng -lnng +// #cgo LDFLAGS: -L/usr/local/lib -lrmr_si // #include // #include import "C" @@ -61,7 +61,6 @@ func (ctx *Context) SendMsg(msg *MBuf, printLogs bool) (*MBuf, error) { ctx.checkContextInitialized() ctx.Logger.Debugf("#rmrCgoApi.SendMsg - Going to send message. MBuf: %v", *msg) allocatedCMBuf := ctx.getAllocatedCRmrMBuf(ctx.Logger, msg, ctx.MaxMsgSize) - defer C.rmr_free_msg(allocatedCMBuf) state := allocatedCMBuf.state if state != RMR_OK { errorMessage := fmt.Sprintf("#rmrCgoApi.SendMsg - Failed to get allocated message. state: %v - %s", state, states[int(state)]) @@ -76,6 +75,8 @@ func (ctx *Context) SendMsg(msg *MBuf, printLogs bool) (*MBuf, error) { } currCMBuf := C.rmr_send_msg(ctx.RmrCtx, allocatedCMBuf) + defer C.rmr_free_msg(currCMBuf) + state = currCMBuf.state if state != RMR_OK { @@ -86,13 +87,49 @@ func (ctx *Context) SendMsg(msg *MBuf, printLogs bool) (*MBuf, error) { return convertToMBuf(ctx.Logger, currCMBuf), nil } +func (ctx *Context) WhSendMsg(msg *MBuf, printLogs bool) (*MBuf, error) { + ctx.checkContextInitialized() + ctx.Logger.Debugf("#rmrCgoApi.WhSendMsg - Going to wormhole send message. MBuf: %v", *msg) + + whid := C.rmr_wh_open(ctx.RmrCtx, (*C.char)(msg.GetMsgSrc())) // open direct connection, returns wormhole ID + ctx.Logger.Infof("#rmrCgoApi.WhSendMsg - The wormhole id %v has been received", whid) + defer C.rmr_wh_close(ctx.RmrCtx, whid) + + allocatedCMBuf := ctx.getAllocatedCRmrMBuf(ctx.Logger, msg, ctx.MaxMsgSize) + state := allocatedCMBuf.state + if state != RMR_OK { + errorMessage := fmt.Sprintf("#rmrCgoApi.WhSendMsg - Failed to get allocated message. state: %v - %s", state, states[int(state)]) + return nil, errors.New(errorMessage) + } + + if printLogs { + transactionId := string(*msg.XAction) + tmpTid := strings.TrimSpace(transactionId) + ctx.Logger.Infof("[E2 Manager -> RMR] #rmrCgoApi.WhSendMsg - Going to send message %v for transaction id: %s", *msg, tmpTid) + } + + currCMBuf := C.rmr_wh_send_msg(ctx.RmrCtx, whid, allocatedCMBuf) + defer C.rmr_free_msg(currCMBuf) + + state = currCMBuf.state + + if state != RMR_OK { + errorMessage := fmt.Sprintf("#rmrCgoApi.WhSendMsg - Failed to send message. state: %v - %s", state, states[int(state)]) + return nil, errors.New(errorMessage) + } + + return convertToMBuf(ctx.Logger, currCMBuf), nil +} + + func (ctx *Context) RecvMsg() (*MBuf, error) { ctx.checkContextInitialized() ctx.Logger.Debugf("#rmrCgoApi.RecvMsg - Going to receive message") allocatedCMBuf := C.rmr_alloc_msg(ctx.RmrCtx, C.int(ctx.MaxMsgSize)) - defer C.rmr_free_msg(allocatedCMBuf) currCMBuf := C.rmr_rcv_msg(ctx.RmrCtx, allocatedCMBuf) + defer C.rmr_free_msg(currCMBuf) + state := currCMBuf.state if state != RMR_OK { @@ -121,4 +158,4 @@ func (ctx *Context) Close() { ctx.Logger.Debugf("#rmrCgoApi.Close - Going to close RMR context") C.rmr_close(ctx.RmrCtx) time.Sleep(100 * time.Millisecond) -} +} \ No newline at end of file