X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Frmr.go;h=530f2230ee756e9e0613fd8129de52affea9ef3f;hb=20a36385164677ba84d7f4159eb65b3e9c42d952;hp=9f988dcf1d396acc714a7d13890dd2c6729f0aa7;hpb=f22b458846a20a4a9fcafb49e3195ab44a16840e;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 9f988dc..530f223 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -127,25 +127,60 @@ func (params *RMRParams) String() string { //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient { - p := C.CString(protPort) - m := C.int(maxSize) - c := C.int(threadType) +type RMRClientParams struct { + protPort string + maxSize int + threadType int + statDesc string + lowLatency bool + fastAck bool +} + +func (params *RMRClientParams) String() string { + return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t", + params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck) +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { + p := C.CString(params.protPort) + m := C.int(params.maxSize) + c := C.int(params.threadType) defer C.free(unsafe.Pointer(p)) ctx := C.rmr_init(p, m, c) if ctx == nil { Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") } + + Logger.Info("new rmrClient with parameters: %s", params.String()) + + if params.lowLatency { + C.rmr_set_low_latency(ctx) + } + if params.fastAck { + C.rmr_set_fack(ctx) + } + return &RMRClient{ - protPort: protPort, + protPort: params.protPort, context: ctx, consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc), } } func NewRMRClient() *RMRClient { - return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR") + return NewRMRClientWithParams( + &RMRClientParams{ + protPort: viper.GetString("rmr.protPort"), + maxSize: viper.GetInt("rmr.maxSize"), + threadType: viper.GetInt("rmr.threadType"), + statDesc: "RMR", + lowLatency: viper.GetBool("rmr.lowLatency"), + fastAck: viper.GetBool("rmr.fastAck"), + }) } func (m *RMRClient) Start(c MessageConsumer) { @@ -313,6 +348,10 @@ func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duratio func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t { + if params == nil { + return nil + } + payLen := len(params.Payload) if params.PayloadLen != 0 { payLen = params.PayloadLen @@ -337,20 +376,21 @@ func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t { datap := C.CBytes(params.Payload) defer C.free(datap) - if params != nil { - if params.Meid != nil { - b := make([]byte, int(C.RMR_MAX_MEID)) - copy(b, []byte(params.Meid.RanName)) - C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) - } - xidLen := len(params.Xid) - if xidLen > 0 && xidLen <= C.RMR_MAX_XID { - b := make([]byte, int(C.RMR_MAX_XID)) - copy(b, []byte(params.Xid)) - C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) - } + if params.Meid != nil { + b := make([]byte, int(C.RMR_MAX_MEID)) + copy(b, []byte(params.Meid.RanName)) + C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) + } + + xidLen := len(params.Xid) + if xidLen > 0 && xidLen <= C.RMR_MAX_XID { + b := make([]byte, int(C.RMR_MAX_XID)) + copy(b, []byte(params.Xid)) + C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) } + C.write_bytes_array(txBuffer.payload, datap, txBuffer.len) + return txBuffer }