mux sync.Mutex
stat map[string]Counter
consumers []MessageConsumer
+ readyCb ReadyCB
}
type MessageConsumer interface {
}
func NewRMRClient() *RMRClient {
- r := &RMRClient{}
- r.consumers = make([]MessageConsumer, 0)
-
p := C.CString(viper.GetString("rmr.protPort"))
m := C.int(viper.GetInt("rmr.maxSize"))
defer C.free(unsafe.Pointer(p))
- r.context = C.rmr_init(p, m, C.int(0))
- if r.context == nil {
+ ctx := C.rmr_init(p, m, C.int(0))
+ if ctx == nil {
Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!")
}
- return r
+ return &RMRClient{
+ context: ctx,
+ consumers: make([]MessageConsumer, 0),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+ }
}
func (m *RMRClient) Start(c MessageConsumer) {
- m.RegisterMetrics()
-
for {
Logger.Info("rmrClient: Waiting for RMR to be ready ...")
go m.Worker("worker-"+strconv.Itoa(w), 0)
}
+ if m.readyCb != nil {
+ m.readyCb()
+ }
+
m.Wait()
}
func (m *RMRClient) GetRicMessageId(mid string) int {
return RICMessageTypes[mid]
}
+
+func (m *RMRClient) SetReadyCB(cb ReadyCB) {
+ m.readyCb = cb
+}