X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fxapp%2Frmr.go;h=3cb4f8481866919390fc2fbfa757f48c5a348a0b;hb=refs%2Fchanges%2F76%2F1776%2F2;hp=d90da022b7a8abf28cd5dcb14c4282f6dfbe6abb;hpb=467a99f0a3e5352ce77514ca2e3f9cc316fa9afc;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index d90da02..3cb4f84 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -83,9 +83,9 @@ type RMRParams struct { Mbuf *C.rmr_mbuf_t } -func NewRMRClient() *RMRClient { - p := C.CString(viper.GetString("rmr.protPort")) - m := C.int(viper.GetInt("rmr.maxSize")) +func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient { + p := C.CString(protPort) + m := C.int(maxSize) defer C.free(unsafe.Pointer(p)) ctx := C.rmr_init(p, m, C.int(0)) @@ -94,12 +94,18 @@ func NewRMRClient() *RMRClient { } return &RMRClient{ - context: ctx, - consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"), + protPort: protPort, + numWorkers: numWorkers, + context: ctx, + consumers: make([]MessageConsumer, 0), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), } } +func NewRMRClient() *RMRClient { + return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR") +} + func (m *RMRClient) Start(c MessageConsumer) { if c != nil { m.consumers = append(m.consumers, c) @@ -113,21 +119,20 @@ func (m *RMRClient) Start(c MessageConsumer) { } time.Sleep(10 * time.Second) } - m.wg.Add(viper.GetInt("rmr.numWorkers")) + m.wg.Add(m.numWorkers) if m.readyCb != nil { go m.readyCb(m.readyCbParams) } - for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ { + for w := 0; w < m.numWorkers; w++ { go m.Worker("worker-"+strconv.Itoa(w), 0) } m.Wait() } func (m *RMRClient) Worker(taskName string, msgSize int) { - p := viper.GetString("rmr.protPort") - Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p) + Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort) defer m.wg.Done() for { @@ -209,6 +214,9 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { txBuffer := params.Mbuf if txBuffer == nil { txBuffer = m.Allocate() + if txBuffer == nil { + return false + } } txBuffer.mtype = C.int(params.Mtype)