Mbuf *C.rmr_mbuf_t
}
-func NewRMRClient() *RMRClient {
- p := C.CString(viper.GetString("rmr.protPort"))
- m := C.int(viper.GetInt("rmr.maxSize"))
+func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
+ p := C.CString(protPort)
+ m := C.int(maxSize)
defer C.free(unsafe.Pointer(p))
ctx := C.rmr_init(p, m, C.int(0))
}
return &RMRClient{
- context: ctx,
- consumers: make([]MessageConsumer, 0),
- stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+ protPort: protPort,
+ numWorkers: numWorkers,
+ context: ctx,
+ consumers: make([]MessageConsumer, 0),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
}
}
+func NewRMRClient() *RMRClient {
+ return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
+}
+
func (m *RMRClient) Start(c MessageConsumer) {
if c != nil {
m.consumers = append(m.consumers, c)
}
time.Sleep(10 * time.Second)
}
- m.wg.Add(viper.GetInt("rmr.numWorkers"))
+ m.wg.Add(m.numWorkers)
if m.readyCb != nil {
go m.readyCb(m.readyCbParams)
}
- for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+ for w := 0; w < m.numWorkers; w++ {
go m.Worker("worker-"+strconv.Itoa(w), 0)
}
m.Wait()
}
func (m *RMRClient) Worker(taskName string, msgSize int) {
- p := viper.GetString("rmr.protPort")
- Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+ Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
defer m.wg.Done()
for {
txBuffer := params.Mbuf
if txBuffer == nil {
txBuffer = m.Allocate()
+ if txBuffer == nil {
+ return false
+ }
}
txBuffer.mtype = C.int(params.Mtype)