From 5a6861191068d31051efdb453f8b97237c8eadb1 Mon Sep 17 00:00:00 2001 From: Timo Tietavainen Date: Wed, 30 Sep 2020 12:34:00 +0300 Subject: [PATCH] Add configurability for RMR low latency and fast ACK Implement new configuration parameters 'lowLatency' and 'fastAck' into xapp-frame to activate RMR's low latency (TCP NO_DELAY) feature and fast TCP acknowledgements feature. By default, both of these features are disabled. Note also that even if these features are activated in configuration, it depends on the transport library underneath the RMR library whether the feature is actually supported or not. Signed-off-by: Timo Tietavainen Change-Id: I720b325f6d3c23054c50b4a3548aed19d0725b5d --- config/config-file.yaml | 2 ++ pkg/xapp/rmr.go | 49 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/config/config-file.yaml b/config/config-file.yaml index daeb14f..3f70925 100755 --- a/config/config-file.yaml +++ b/config/config-file.yaml @@ -27,6 +27,8 @@ "id" : 55555 - "name": "TESTNAME2" "id" : 55556 + "lowLatency": False + "fastAck": False "subscription": "host": "localhost:8088" "timeout": 2 diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index e27d46e..530f223 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -127,25 +127,60 @@ func (params *RMRParams) String() string { //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient { - p := C.CString(protPort) - m := C.int(maxSize) - c := C.int(threadType) +type RMRClientParams struct { + protPort string + maxSize int + threadType int + statDesc string + lowLatency bool + fastAck bool +} + +func (params *RMRClientParams) String() string { + return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t", + params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck) +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { + p := C.CString(params.protPort) + m := C.int(params.maxSize) + c := C.int(params.threadType) defer C.free(unsafe.Pointer(p)) ctx := C.rmr_init(p, m, c) if ctx == nil { Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") } + + Logger.Info("new rmrClient with parameters: %s", params.String()) + + if params.lowLatency { + C.rmr_set_low_latency(ctx) + } + if params.fastAck { + C.rmr_set_fack(ctx) + } + return &RMRClient{ - protPort: protPort, + protPort: params.protPort, context: ctx, consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc), } } func NewRMRClient() *RMRClient { - return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR") + return NewRMRClientWithParams( + &RMRClientParams{ + protPort: viper.GetString("rmr.protPort"), + maxSize: viper.GetInt("rmr.maxSize"), + threadType: viper.GetInt("rmr.threadType"), + statDesc: "RMR", + lowLatency: viper.GetBool("rmr.lowLatency"), + fastAck: viper.GetBool("rmr.fastAck"), + }) } func (m *RMRClient) Start(c MessageConsumer) { -- 2.16.6