Made possible to have multiple rmrclient instances
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index d90da02..3cb4f84 100755 (executable)
@@ -83,9 +83,9 @@ type RMRParams struct {
        Mbuf       *C.rmr_mbuf_t
 }
 
-func NewRMRClient() *RMRClient {
-       p := C.CString(viper.GetString("rmr.protPort"))
-       m := C.int(viper.GetInt("rmr.maxSize"))
+func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
+       p := C.CString(protPort)
+       m := C.int(maxSize)
        defer C.free(unsafe.Pointer(p))
 
        ctx := C.rmr_init(p, m, C.int(0))
@@ -94,12 +94,18 @@ func NewRMRClient() *RMRClient {
        }
 
        return &RMRClient{
-               context:   ctx,
-               consumers: make([]MessageConsumer, 0),
-               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+               protPort:   protPort,
+               numWorkers: numWorkers,
+               context:    ctx,
+               consumers:  make([]MessageConsumer, 0),
+               stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
        }
 }
 
+func NewRMRClient() *RMRClient {
+       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
+}
+
 func (m *RMRClient) Start(c MessageConsumer) {
        if c != nil {
                m.consumers = append(m.consumers, c)
@@ -113,21 +119,20 @@ func (m *RMRClient) Start(c MessageConsumer) {
                }
                time.Sleep(10 * time.Second)
        }
-       m.wg.Add(viper.GetInt("rmr.numWorkers"))
+       m.wg.Add(m.numWorkers)
 
        if m.readyCb != nil {
                go m.readyCb(m.readyCbParams)
        }
 
-       for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+       for w := 0; w < m.numWorkers; w++ {
                go m.Worker("worker-"+strconv.Itoa(w), 0)
        }
        m.Wait()
 }
 
 func (m *RMRClient) Worker(taskName string, msgSize int) {
-       p := viper.GetString("rmr.protPort")
-       Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+       Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
 
        defer m.wg.Done()
        for {
@@ -209,6 +214,9 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
        txBuffer := params.Mbuf
        if txBuffer == nil {
                txBuffer = m.Allocate()
+               if txBuffer == nil {
+                       return false
+               }
        }
 
        txBuffer.mtype = C.int(params.Mtype)