//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
- p := C.CString(protPort)
- m := C.int(maxSize)
- c := C.int(threadType)
+type RMRClientParams struct {
+ protPort string
+ maxSize int
+ threadType int
+ statDesc string
+ lowLatency bool
+ fastAck bool
+}
+
+func (params *RMRClientParams) String() string {
+ return fmt.Sprintf("protPort=%s maxSize=%d threadType=%d statDesc=%s lowLatency=%t fastAck=%t",
+ params.protPort, params.maxSize, params.threadType, params.statDesc, params.lowLatency, params.fastAck)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
+ p := C.CString(params.protPort)
+ m := C.int(params.maxSize)
+ c := C.int(params.threadType)
defer C.free(unsafe.Pointer(p))
ctx := C.rmr_init(p, m, c)
if ctx == nil {
Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
}
+
+ Logger.Info("new rmrClient with parameters: %s", params.String())
+
+ if params.lowLatency {
+ C.rmr_set_low_latency(ctx)
+ }
+ if params.fastAck {
+ C.rmr_set_fack(ctx)
+ }
+
return &RMRClient{
- protPort: protPort,
+ protPort: params.protPort,
context: ctx,
consumers: make([]MessageConsumer, 0),
- stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.statDesc),
}
}
func NewRMRClient() *RMRClient {
- return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
+ return NewRMRClientWithParams(
+ &RMRClientParams{
+ protPort: viper.GetString("rmr.protPort"),
+ maxSize: viper.GetInt("rmr.maxSize"),
+ threadType: viper.GetInt("rmr.threadType"),
+ statDesc: "RMR",
+ lowLatency: viper.GetBool("rmr.lowLatency"),
+ fastAck: viper.GetBool("rmr.fastAck"),
+ })
}
func (m *RMRClient) Start(c MessageConsumer) {