From f49009aad3b16524cb53ae954a0711c17ec4f82a Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Tue, 26 Nov 2019 10:28:14 +0200 Subject: [PATCH] Made possible to have multiple rmrclient instances Change-Id: I18d9ed40a8d87e255197eeafa912f904cb0646bf Signed-off-by: Juha Hyttinen --- pkg/xapp/rmr.go | 28 ++++++++++++++++++---------- pkg/xapp/types.go | 2 ++ pkg/xapp/xapp.go | 42 ++++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index d90da02..3cb4f84 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -83,9 +83,9 @@ type RMRParams struct { Mbuf *C.rmr_mbuf_t } -func NewRMRClient() *RMRClient { - p := C.CString(viper.GetString("rmr.protPort")) - m := C.int(viper.GetInt("rmr.maxSize")) +func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient { + p := C.CString(protPort) + m := C.int(maxSize) defer C.free(unsafe.Pointer(p)) ctx := C.rmr_init(p, m, C.int(0)) @@ -94,12 +94,18 @@ func NewRMRClient() *RMRClient { } return &RMRClient{ - context: ctx, - consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"), + protPort: protPort, + numWorkers: numWorkers, + context: ctx, + consumers: make([]MessageConsumer, 0), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), } } +func NewRMRClient() *RMRClient { + return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR") +} + func (m *RMRClient) Start(c MessageConsumer) { if c != nil { m.consumers = append(m.consumers, c) @@ -113,21 +119,20 @@ func (m *RMRClient) Start(c MessageConsumer) { } time.Sleep(10 * time.Second) } - m.wg.Add(viper.GetInt("rmr.numWorkers")) + m.wg.Add(m.numWorkers) if m.readyCb != nil { go m.readyCb(m.readyCbParams) } - for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ { + for w := 0; w < m.numWorkers; w++ { go m.Worker("worker-"+strconv.Itoa(w), 0) } m.Wait() } func (m *RMRClient) Worker(taskName string, msgSize int) { - p := viper.GetString("rmr.protPort") - Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p) + Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort) defer m.wg.Done() for { @@ -209,6 +214,9 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { txBuffer := params.Mbuf if txBuffer == nil { txBuffer = m.Allocate() + if txBuffer == nil { + return false + } } txBuffer.mtype = C.int(params.Mtype) diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go index 110244b..2e1eb09 100644 --- a/pkg/xapp/types.go +++ b/pkg/xapp/types.go @@ -27,6 +27,8 @@ import ( type RMRStatistics struct{} type RMRClient struct { + protPort string + numWorkers int context unsafe.Pointer ready int wg sync.WaitGroup diff --git a/pkg/xapp/xapp.go b/pkg/xapp/xapp.go index 1fd7ad6..45a3564 100644 --- a/pkg/xapp/xapp.go +++ b/pkg/xapp/xapp.go @@ -29,21 +29,30 @@ type ReadyCB func(interface{}) var ( // XApp is an application instance - Rmr *RMRClient - Sdl *SDLClient - Rnib *RNIBClient - Resource *Router - Metric *Metrics - Logger *Log - Config Configurator + Rmr *RMRClient + Sdl *SDLClient + Rnib *RNIBClient + Resource *Router + Metric *Metrics + Logger *Log + Config Configurator + readyCb ReadyCB + readyCbParams interface{} ) func IsReady() bool { - return Rmr.IsReady() && Sdl.IsReady() + return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady() } func SetReadyCB(cb ReadyCB, params interface{}) { - Rmr.SetReadyCB(cb, params) + readyCb = cb + readyCbParams = params +} + +func xappReadyCb(params interface{}) { + if readyCb != nil { + readyCb(readyCbParams) + } } func init() { @@ -54,7 +63,6 @@ func init() { Resource = NewRouter() Config = Configurator{} Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router) - Rmr = NewRMRClient() if viper.IsSet("db.namespaces") { namespaces := viper.GetStringSlice("db.namespaces") @@ -69,11 +77,17 @@ func init() { } } -func Run(c MessageConsumer) { +func RunWithParams(c MessageConsumer, sdlcheck bool) { + Rmr = NewRMRClient() + Rmr.SetReadyCB(xappReadyCb, nil) go http.ListenAndServe(viper.GetString("local.host"), Resource.router) - Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host"))) - - Sdl.TestConnection() + if sdlcheck { + Sdl.TestConnection() + } Rmr.Start(c) } + +func Run(c MessageConsumer) { + RunWithParams(c, true) +} -- 2.16.6