X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Frmr.go;h=530f2230ee756e9e0613fd8129de52affea9ef3f;hb=20a36385164677ba84d7f4159eb65b3e9c42d952;hp=e27d46e9bd4433cfe65278845ef7f59f7efbf49a;hpb=7032754482cc88c4a9cce424a4717de7165fee17;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index e27d46e..530f223 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -127,25 +127,60 @@ func (params *RMRParams) String() string { //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient { - p := C.CString(protPort) - m := C.int(maxSize) - c := C.int(threadType) +type RMRClientParams struct { + protPort string + maxSize int + threadType int + statDesc string + lowLatency bool + fastAck bool +} + +func (params *RMRClientParams) String() string { + return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t", + params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck) +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { + p := C.CString(params.protPort) + m := C.int(params.maxSize) + c := C.int(params.threadType) defer C.free(unsafe.Pointer(p)) ctx := C.rmr_init(p, m, c) if ctx == nil { Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") } + + Logger.Info("new rmrClient with parameters: %s", params.String()) + + if params.lowLatency { + C.rmr_set_low_latency(ctx) + } + if params.fastAck { + C.rmr_set_fack(ctx) + } + return &RMRClient{ - protPort: protPort, + protPort: params.protPort, context: ctx, consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc), } } func NewRMRClient() *RMRClient { - return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR") + return NewRMRClientWithParams( + &RMRClientParams{ + protPort: viper.GetString("rmr.protPort"), + maxSize: viper.GetInt("rmr.maxSize"), + threadType: viper.GetInt("rmr.threadType"), + statDesc: "RMR", + lowLatency: viper.GetBool("rmr.lowLatency"), + fastAck: viper.GetBool("rmr.fastAck"), + }) } func (m *RMRClient) Start(c MessageConsumer) {