Merge "Newest RMR release 4.2.2"
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index 9f988dc..530f223 100755 (executable)
@@ -127,25 +127,60 @@ func (params *RMRParams) String() string {
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
-       p := C.CString(protPort)
-       m := C.int(maxSize)
-       c := C.int(threadType)
+type RMRClientParams struct {
+       protPort   string
+       maxSize    int
+       threadType int
+       statDesc   string
+       lowLatency bool
+       fastAck    bool
+}
+
+func (params *RMRClientParams) String() string {
+       return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t",
+               params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
+       p := C.CString(params.protPort)
+       m := C.int(params.maxSize)
+       c := C.int(params.threadType)
        defer C.free(unsafe.Pointer(p))
        ctx := C.rmr_init(p, m, c)
        if ctx == nil {
                Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
        }
+
+       Logger.Info("new rmrClient with parameters: %s", params.String())
+
+       if params.lowLatency {
+               C.rmr_set_low_latency(ctx)
+       }
+       if params.fastAck {
+               C.rmr_set_fack(ctx)
+       }
+
        return &RMRClient{
-               protPort:  protPort,
+               protPort:  params.protPort,
                context:   ctx,
                consumers: make([]MessageConsumer, 0),
-               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
+               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc),
        }
 }
 
 func NewRMRClient() *RMRClient {
-       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
+       return NewRMRClientWithParams(
+               &RMRClientParams{
+                       protPort:   viper.GetString("rmr.protPort"),
+                       maxSize:    viper.GetInt("rmr.maxSize"),
+                       threadType: viper.GetInt("rmr.threadType"),
+                       statDesc:   "RMR",
+                       lowLatency: viper.GetBool("rmr.lowLatency"),
+                       fastAck:    viper.GetBool("rmr.fastAck"),
+               })
 }
 
 func (m *RMRClient) Start(c MessageConsumer) {
@@ -313,6 +348,10 @@ func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duratio
 
 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
 
+       if params == nil {
+               return nil
+       }
+
        payLen := len(params.Payload)
        if params.PayloadLen != 0 {
                payLen = params.PayloadLen
@@ -337,20 +376,21 @@ func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
        datap := C.CBytes(params.Payload)
        defer C.free(datap)
 
-       if params != nil {
-               if params.Meid != nil {
-                       b := make([]byte, int(C.RMR_MAX_MEID))
-                       copy(b, []byte(params.Meid.RanName))
-                       C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
-               }
-               xidLen := len(params.Xid)
-               if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
-                       b := make([]byte, int(C.RMR_MAX_XID))
-                       copy(b, []byte(params.Xid))
-                       C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
-               }
+       if params.Meid != nil {
+               b := make([]byte, int(C.RMR_MAX_MEID))
+               copy(b, []byte(params.Meid.RanName))
+               C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+       }
+
+       xidLen := len(params.Xid)
+       if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
+               b := make([]byte, int(C.RMR_MAX_XID))
+               copy(b, []byte(params.Xid))
+               C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
        }
+
        C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
+
        return txBuffer
 }