type RMRStatistics struct{}
type RMRClient struct {
- context unsafe.Pointer
- ready int
- wg sync.WaitGroup
- mux sync.Mutex
- stat map[string]Counter
- consumers []MessageConsumer
+ context unsafe.Pointer
+ ready int
+ wg sync.WaitGroup
+ mux sync.Mutex
+ stat map[string]Counter
+ consumers []MessageConsumer
+ readyCb ReadyCB
+ readyCbParams interface{}
}
type MessageConsumer interface {
}
func NewRMRClient() *RMRClient {
- r := &RMRClient{}
- r.consumers = make([]MessageConsumer, 0)
-
p := C.CString(viper.GetString("rmr.protPort"))
m := C.int(viper.GetInt("rmr.maxSize"))
defer C.free(unsafe.Pointer(p))
- r.context = C.rmr_init(p, m, C.int(0))
- if r.context == nil {
- Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!")
+ ctx := C.rmr_init(p, m, C.int(0))
+ if ctx == nil {
+ Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
}
- return r
+ return &RMRClient{
+ context: ctx,
+ consumers: make([]MessageConsumer, 0),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+ }
}
func (m *RMRClient) Start(c MessageConsumer) {
- m.RegisterMetrics()
-
for {
Logger.Info("rmrClient: Waiting for RMR to be ready ...")
go m.Worker("worker-"+strconv.Itoa(w), 0)
}
+ if m.readyCb != nil {
+ m.readyCb(m.readyCbParams)
+ }
+
m.Wait()
}
func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
buf := C.rmr_alloc_msg(m.context, 0)
if buf == nil {
- Logger.Fatal("rmrClient: Allocating message buffer failed!")
+ Logger.Error("rmrClient: Allocating message buffer failed!")
}
return buf
return m.ready != 0
}
-func (m *RMRClient) GetRicMessageId(mid string) int {
- return RICMessageTypes[mid]
+func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
+ m.readyCb = cb
+ m.readyCbParams = params
+}
+
+func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
+ id, ok := RICMessageTypes[name]
+ return id, ok
+}
+
+func (m *RMRClient) GetRicMessageName(id int) (s string) {
+ for k, v := range RICMessageTypes {
+ if id == v {
+ return k
+ }
+ }
+ return
}