X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Frmr.go;h=e910ec1fbc7bd45786af901df3dd6be75b3b3af9;hb=349a098dffbe8db49a067f3c9b37c44d891ce704;hp=9b1edd77451305dcc1e2308ea32bfff3d5327f70;hpb=127f16726e208a904dda9d6f9c5016df86064438;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 9b1edd7..e910ec1 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -61,6 +61,7 @@ type RMRClient struct { mux sync.Mutex stat map[string]Counter consumers []MessageConsumer + readyCb ReadyCB } type MessageConsumer interface { @@ -68,24 +69,23 @@ type MessageConsumer interface { } func NewRMRClient() *RMRClient { - r := &RMRClient{} - r.consumers = make([]MessageConsumer, 0) - p := C.CString(viper.GetString("rmr.protPort")) m := C.int(viper.GetInt("rmr.maxSize")) defer C.free(unsafe.Pointer(p)) - r.context = C.rmr_init(p, m, C.int(0)) - if r.context == nil { + ctx := C.rmr_init(p, m, C.int(0)) + if ctx == nil { Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!") } - return r + return &RMRClient{ + context: ctx, + consumers: make([]MessageConsumer, 0), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"), + } } func (m *RMRClient) Start(c MessageConsumer) { - m.RegisterMetrics() - for { Logger.Info("rmrClient: Waiting for RMR to be ready ...") @@ -104,6 +104,10 @@ func (m *RMRClient) Start(c MessageConsumer) { go m.Worker("worker-"+strconv.Itoa(w), 0) } + if m.readyCb != nil { + m.readyCb() + } + m.Wait() } @@ -215,3 +219,7 @@ func (m *RMRClient) IsReady() bool { func (m *RMRClient) GetRicMessageId(mid string) int { return RICMessageTypes[mid] } + +func (m *RMRClient) SetReadyCB(cb ReadyCB) { + m.readyCb = cb +}