Add configurability for RMR low latency and fast ACK 92/4792/1
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Wed, 30 Sep 2020 09:34:00 +0000 (12:34 +0300)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Wed, 30 Sep 2020 09:34:00 +0000 (12:34 +0300)
Implement new configuration parameters 'lowLatency' and 'fastAck' into
xapp-frame to activate RMR's low latency (TCP NO_DELAY) feature and fast TCP
acknowledgements feature. By default, both of these features are disabled.
Note also that even if these features are activated in configuration, it
depends on the transport library underneath the RMR library whether the feature
is actually supported or not.

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I720b325f6d3c23054c50b4a3548aed19d0725b5d

config/config-file.yaml
pkg/xapp/rmr.go

index daeb14f..3f70925 100755 (executable)
@@ -27,6 +27,8 @@
       "id" : 55555
     - "name": "TESTNAME2"
       "id" : 55556
+  "lowLatency": False
+  "fastAck": False
 "subscription":
     "host": "localhost:8088"
     "timeout": 2
index e27d46e..530f223 100755 (executable)
@@ -127,25 +127,60 @@ func (params *RMRParams) String() string {
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
-       p := C.CString(protPort)
-       m := C.int(maxSize)
-       c := C.int(threadType)
+type RMRClientParams struct {
+       protPort   string
+       maxSize    int
+       threadType int
+       statDesc   string
+       lowLatency bool
+       fastAck    bool
+}
+
+func (params *RMRClientParams) String() string {
+       return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t",
+               params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
+       p := C.CString(params.protPort)
+       m := C.int(params.maxSize)
+       c := C.int(params.threadType)
        defer C.free(unsafe.Pointer(p))
        ctx := C.rmr_init(p, m, c)
        if ctx == nil {
                Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
        }
+
+       Logger.Info("new rmrClient with parameters: %s", params.String())
+
+       if params.lowLatency {
+               C.rmr_set_low_latency(ctx)
+       }
+       if params.fastAck {
+               C.rmr_set_fack(ctx)
+       }
+
        return &RMRClient{
-               protPort:  protPort,
+               protPort:  params.protPort,
                context:   ctx,
                consumers: make([]MessageConsumer, 0),
-               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
+               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc),
        }
 }
 
 func NewRMRClient() *RMRClient {
-       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
+       return NewRMRClientWithParams(
+               &RMRClientParams{
+                       protPort:   viper.GetString("rmr.protPort"),
+                       maxSize:    viper.GetInt("rmr.maxSize"),
+                       threadType: viper.GetInt("rmr.threadType"),
+                       statDesc:   "RMR",
+                       lowLatency: viper.GetBool("rmr.lowLatency"),
+                       fastAck:    viper.GetBool("rmr.fastAck"),
+               })
 }
 
 func (m *RMRClient) Start(c MessageConsumer) {