Merge "Newest RMR release 4.2.2" v0.5.4
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Mon, 5 Oct 2020 12:12:14 +0000 (12:12 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Mon, 5 Oct 2020 12:12:14 +0000 (12:12 +0000)
config/config-file.yaml
pkg/xapp/rmr.go

index daeb14f..3f70925 100755 (executable)
@@ -27,6 +27,8 @@
       "id" : 55555
     - "name": "TESTNAME2"
       "id" : 55556
+  "lowLatency": False
+  "fastAck": False
 "subscription":
     "host": "localhost:8088"
     "timeout": 2
index e27d46e..530f223 100755 (executable)
@@ -127,25 +127,60 @@ func (params *RMRParams) String() string {
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
-       p := C.CString(protPort)
-       m := C.int(maxSize)
-       c := C.int(threadType)
+type RMRClientParams struct {
+       protPort   string
+       maxSize    int
+       threadType int
+       statDesc   string
+       lowLatency bool
+       fastAck    bool
+}
+
+func (params *RMRClientParams) String() string {
+       return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t",
+               params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
+       p := C.CString(params.protPort)
+       m := C.int(params.maxSize)
+       c := C.int(params.threadType)
        defer C.free(unsafe.Pointer(p))
        ctx := C.rmr_init(p, m, c)
        if ctx == nil {
                Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
        }
+
+       Logger.Info("new rmrClient with parameters: %s", params.String())
+
+       if params.lowLatency {
+               C.rmr_set_low_latency(ctx)
+       }
+       if params.fastAck {
+               C.rmr_set_fack(ctx)
+       }
+
        return &RMRClient{
-               protPort:  protPort,
+               protPort:  params.protPort,
                context:   ctx,
                consumers: make([]MessageConsumer, 0),
-               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
+               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc),
        }
 }
 
 func NewRMRClient() *RMRClient {
-       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
+       return NewRMRClientWithParams(
+               &RMRClientParams{
+                       protPort:   viper.GetString("rmr.protPort"),
+                       maxSize:    viper.GetInt("rmr.maxSize"),
+                       threadType: viper.GetInt("rmr.threadType"),
+                       statDesc:   "RMR",
+                       lowLatency: viper.GetBool("rmr.lowLatency"),
+                       fastAck:    viper.GetBool("rmr.fastAck"),
+               })
 }
 
 func (m *RMRClient) Start(c MessageConsumer) {