Made possible to have multiple rmrclient instances 76/1776/2 v0.0.22
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 26 Nov 2019 08:28:14 +0000 (10:28 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 26 Nov 2019 10:07:46 +0000 (12:07 +0200)
Change-Id: I18d9ed40a8d87e255197eeafa912f904cb0646bf
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/xapp/rmr.go
pkg/xapp/types.go
pkg/xapp/xapp.go

index d90da02..3cb4f84 100755 (executable)
@@ -83,9 +83,9 @@ type RMRParams struct {
        Mbuf       *C.rmr_mbuf_t
 }
 
-func NewRMRClient() *RMRClient {
-       p := C.CString(viper.GetString("rmr.protPort"))
-       m := C.int(viper.GetInt("rmr.maxSize"))
+func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
+       p := C.CString(protPort)
+       m := C.int(maxSize)
        defer C.free(unsafe.Pointer(p))
 
        ctx := C.rmr_init(p, m, C.int(0))
@@ -94,12 +94,18 @@ func NewRMRClient() *RMRClient {
        }
 
        return &RMRClient{
-               context:   ctx,
-               consumers: make([]MessageConsumer, 0),
-               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+               protPort:   protPort,
+               numWorkers: numWorkers,
+               context:    ctx,
+               consumers:  make([]MessageConsumer, 0),
+               stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
        }
 }
 
+func NewRMRClient() *RMRClient {
+       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
+}
+
 func (m *RMRClient) Start(c MessageConsumer) {
        if c != nil {
                m.consumers = append(m.consumers, c)
@@ -113,21 +119,20 @@ func (m *RMRClient) Start(c MessageConsumer) {
                }
                time.Sleep(10 * time.Second)
        }
-       m.wg.Add(viper.GetInt("rmr.numWorkers"))
+       m.wg.Add(m.numWorkers)
 
        if m.readyCb != nil {
                go m.readyCb(m.readyCbParams)
        }
 
-       for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+       for w := 0; w < m.numWorkers; w++ {
                go m.Worker("worker-"+strconv.Itoa(w), 0)
        }
        m.Wait()
 }
 
 func (m *RMRClient) Worker(taskName string, msgSize int) {
-       p := viper.GetString("rmr.protPort")
-       Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+       Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
 
        defer m.wg.Done()
        for {
@@ -209,6 +214,9 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
        txBuffer := params.Mbuf
        if txBuffer == nil {
                txBuffer = m.Allocate()
+               if txBuffer == nil {
+                       return false
+               }
        }
 
        txBuffer.mtype = C.int(params.Mtype)
index 110244b..2e1eb09 100644 (file)
@@ -27,6 +27,8 @@ import (
 type RMRStatistics struct{}
 
 type RMRClient struct {
+       protPort      string
+       numWorkers    int
        context       unsafe.Pointer
        ready         int
        wg            sync.WaitGroup
index 1fd7ad6..45a3564 100644 (file)
@@ -29,21 +29,30 @@ type ReadyCB func(interface{})
 
 var (
        // XApp is an application instance
-       Rmr      *RMRClient
-       Sdl      *SDLClient
-       Rnib     *RNIBClient
-       Resource *Router
-       Metric   *Metrics
-       Logger   *Log
-       Config   Configurator
+       Rmr           *RMRClient
+       Sdl           *SDLClient
+       Rnib          *RNIBClient
+       Resource      *Router
+       Metric        *Metrics
+       Logger        *Log
+       Config        Configurator
+       readyCb       ReadyCB
+       readyCbParams interface{}
 )
 
 func IsReady() bool {
-       return Rmr.IsReady() && Sdl.IsReady()
+       return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
 }
 
 func SetReadyCB(cb ReadyCB, params interface{}) {
-       Rmr.SetReadyCB(cb, params)
+       readyCb = cb
+       readyCbParams = params
+}
+
+func xappReadyCb(params interface{}) {
+       if readyCb != nil {
+               readyCb(readyCbParams)
+       }
 }
 
 func init() {
@@ -54,7 +63,6 @@ func init() {
        Resource = NewRouter()
        Config = Configurator{}
        Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
-       Rmr = NewRMRClient()
 
        if viper.IsSet("db.namespaces") {
                namespaces := viper.GetStringSlice("db.namespaces")
@@ -69,11 +77,17 @@ func init() {
        }
 }
 
-func Run(c MessageConsumer) {
+func RunWithParams(c MessageConsumer, sdlcheck bool) {
+       Rmr = NewRMRClient()
+       Rmr.SetReadyCB(xappReadyCb, nil)
        go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
-
        Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
-
-       Sdl.TestConnection()
+       if sdlcheck {
+               Sdl.TestConnection()
+       }
        Rmr.Start(c)
 }
+
+func Run(c MessageConsumer) {
+       RunWithParams(c, true)
+}