X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fxapp%2Frmr.go;h=c835408fa303f19c83d1c5b1fcaa6c8fe47212f6;hb=192518d79ebcffe17bf569bf9037321a6fd26d44;hp=9b1edd77451305dcc1e2308ea32bfff3d5327f70;hpb=2e78e42c5896b61b77ab3a97e45704f6749161b2;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 9b1edd7..c835408 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -55,12 +55,14 @@ var RMRCounterOpts = []CounterOpts{ type RMRStatistics struct{} type RMRClient struct { - context unsafe.Pointer - ready int - wg sync.WaitGroup - mux sync.Mutex - stat map[string]Counter - consumers []MessageConsumer + context unsafe.Pointer + ready int + wg sync.WaitGroup + mux sync.Mutex + stat map[string]Counter + consumers []MessageConsumer + readyCb ReadyCB + readyCbParams interface{} } type MessageConsumer interface { @@ -68,24 +70,23 @@ type MessageConsumer interface { } func NewRMRClient() *RMRClient { - r := &RMRClient{} - r.consumers = make([]MessageConsumer, 0) - p := C.CString(viper.GetString("rmr.protPort")) m := C.int(viper.GetInt("rmr.maxSize")) defer C.free(unsafe.Pointer(p)) - r.context = C.rmr_init(p, m, C.int(0)) - if r.context == nil { - Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!") + ctx := C.rmr_init(p, m, C.int(0)) + if ctx == nil { + Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") } - return r + return &RMRClient{ + context: ctx, + consumers: make([]MessageConsumer, 0), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"), + } } func (m *RMRClient) Start(c MessageConsumer) { - m.RegisterMetrics() - for { Logger.Info("rmrClient: Waiting for RMR to be ready ...") @@ -104,6 +105,10 @@ func (m *RMRClient) Start(c MessageConsumer) { go m.Worker("worker-"+strconv.Itoa(w), 0) } + if m.readyCb != nil { + m.readyCb(m.readyCbParams) + } + m.Wait() } @@ -144,7 +149,7 @@ func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { func (m *RMRClient) Allocate() *C.rmr_mbuf_t { buf := C.rmr_alloc_msg(m.context, 0) if buf == nil { - Logger.Fatal("rmrClient: Allocating message buffer failed!") + Logger.Error("rmrClient: Allocating message buffer failed!") } return buf @@ -212,6 +217,21 @@ func (m *RMRClient) IsReady() bool { return m.ready != 0 } -func (m *RMRClient) GetRicMessageId(mid string) int { - return RICMessageTypes[mid] +func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) { + m.readyCb = cb + m.readyCbParams = params +} + +func (m *RMRClient) GetRicMessageId(name string) (int, bool) { + id, ok := RICMessageTypes[name] + return id, ok +} + +func (m *RMRClient) GetRicMessageName(id int) (s string) { + for k, v := range RICMessageTypes { + if id == v { + return k + } + } + return }