Add new interfaces
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index 9b1edd7..c835408 100755 (executable)
@@ -55,12 +55,14 @@ var RMRCounterOpts = []CounterOpts{
 type RMRStatistics struct{}
 
 type RMRClient struct {
-       context   unsafe.Pointer
-       ready     int
-       wg        sync.WaitGroup
-       mux       sync.Mutex
-       stat      map[string]Counter
-       consumers []MessageConsumer
+       context       unsafe.Pointer
+       ready         int
+       wg            sync.WaitGroup
+       mux           sync.Mutex
+       stat          map[string]Counter
+       consumers     []MessageConsumer
+       readyCb       ReadyCB
+       readyCbParams interface{}
 }
 
 type MessageConsumer interface {
@@ -68,24 +70,23 @@ type MessageConsumer interface {
 }
 
 func NewRMRClient() *RMRClient {
-       r := &RMRClient{}
-       r.consumers = make([]MessageConsumer, 0)
-
        p := C.CString(viper.GetString("rmr.protPort"))
        m := C.int(viper.GetInt("rmr.maxSize"))
        defer C.free(unsafe.Pointer(p))
 
-       r.context = C.rmr_init(p, m, C.int(0))
-       if r.context == nil {
-               Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!")
+       ctx := C.rmr_init(p, m, C.int(0))
+       if ctx == nil {
+               Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
        }
 
-       return r
+       return &RMRClient{
+               context:   ctx,
+               consumers: make([]MessageConsumer, 0),
+               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+       }
 }
 
 func (m *RMRClient) Start(c MessageConsumer) {
-       m.RegisterMetrics()
-
        for {
                Logger.Info("rmrClient: Waiting for RMR to be ready ...")
 
@@ -104,6 +105,10 @@ func (m *RMRClient) Start(c MessageConsumer) {
                go m.Worker("worker-"+strconv.Itoa(w), 0)
        }
 
+       if m.readyCb != nil {
+               m.readyCb(m.readyCbParams)
+       }
+
        m.Wait()
 }
 
@@ -144,7 +149,7 @@ func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
        buf := C.rmr_alloc_msg(m.context, 0)
        if buf == nil {
-               Logger.Fatal("rmrClient: Allocating message buffer failed!")
+               Logger.Error("rmrClient: Allocating message buffer failed!")
        }
 
        return buf
@@ -212,6 +217,21 @@ func (m *RMRClient) IsReady() bool {
        return m.ready != 0
 }
 
-func (m *RMRClient) GetRicMessageId(mid string) int {
-       return RICMessageTypes[mid]
+func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
+       m.readyCb = cb
+       m.readyCbParams = params
+}
+
+func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
+       id, ok := RICMessageTypes[name]
+       return id, ok
+}
+
+func (m *RMRClient) GetRicMessageName(id int) (s string) {
+       for k, v := range RICMessageTypes {
+               if id == v {
+                       return k
+               }
+       }
+       return
 }