Change-Id: I18d9ed40a8d87e255197eeafa912f904cb0646bf
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
-func NewRMRClient() *RMRClient {
- p := C.CString(viper.GetString("rmr.protPort"))
- m := C.int(viper.GetInt("rmr.maxSize"))
+func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
+ p := C.CString(protPort)
+ m := C.int(maxSize)
defer C.free(unsafe.Pointer(p))
ctx := C.rmr_init(p, m, C.int(0))
defer C.free(unsafe.Pointer(p))
ctx := C.rmr_init(p, m, C.int(0))
- context: ctx,
- consumers: make([]MessageConsumer, 0),
- stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+ protPort: protPort,
+ numWorkers: numWorkers,
+ context: ctx,
+ consumers: make([]MessageConsumer, 0),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
+func NewRMRClient() *RMRClient {
+ return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
+}
+
func (m *RMRClient) Start(c MessageConsumer) {
if c != nil {
m.consumers = append(m.consumers, c)
func (m *RMRClient) Start(c MessageConsumer) {
if c != nil {
m.consumers = append(m.consumers, c)
}
time.Sleep(10 * time.Second)
}
}
time.Sleep(10 * time.Second)
}
- m.wg.Add(viper.GetInt("rmr.numWorkers"))
if m.readyCb != nil {
go m.readyCb(m.readyCbParams)
}
if m.readyCb != nil {
go m.readyCb(m.readyCbParams)
}
- for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+ for w := 0; w < m.numWorkers; w++ {
go m.Worker("worker-"+strconv.Itoa(w), 0)
}
m.Wait()
}
func (m *RMRClient) Worker(taskName string, msgSize int) {
go m.Worker("worker-"+strconv.Itoa(w), 0)
}
m.Wait()
}
func (m *RMRClient) Worker(taskName string, msgSize int) {
- p := viper.GetString("rmr.protPort")
- Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+ Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
txBuffer := params.Mbuf
if txBuffer == nil {
txBuffer = m.Allocate()
txBuffer := params.Mbuf
if txBuffer == nil {
txBuffer = m.Allocate()
+ if txBuffer == nil {
+ return false
+ }
}
txBuffer.mtype = C.int(params.Mtype)
}
txBuffer.mtype = C.int(params.Mtype)
type RMRStatistics struct{}
type RMRClient struct {
type RMRStatistics struct{}
type RMRClient struct {
+ protPort string
+ numWorkers int
context unsafe.Pointer
ready int
wg sync.WaitGroup
context unsafe.Pointer
ready int
wg sync.WaitGroup
var (
// XApp is an application instance
var (
// XApp is an application instance
- Rmr *RMRClient
- Sdl *SDLClient
- Rnib *RNIBClient
- Resource *Router
- Metric *Metrics
- Logger *Log
- Config Configurator
+ Rmr *RMRClient
+ Sdl *SDLClient
+ Rnib *RNIBClient
+ Resource *Router
+ Metric *Metrics
+ Logger *Log
+ Config Configurator
+ readyCb ReadyCB
+ readyCbParams interface{}
- return Rmr.IsReady() && Sdl.IsReady()
+ return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
}
func SetReadyCB(cb ReadyCB, params interface{}) {
}
func SetReadyCB(cb ReadyCB, params interface{}) {
- Rmr.SetReadyCB(cb, params)
+ readyCb = cb
+ readyCbParams = params
+}
+
+func xappReadyCb(params interface{}) {
+ if readyCb != nil {
+ readyCb(readyCbParams)
+ }
Resource = NewRouter()
Config = Configurator{}
Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
Resource = NewRouter()
Config = Configurator{}
Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
if viper.IsSet("db.namespaces") {
namespaces := viper.GetStringSlice("db.namespaces")
if viper.IsSet("db.namespaces") {
namespaces := viper.GetStringSlice("db.namespaces")
-func Run(c MessageConsumer) {
+func RunWithParams(c MessageConsumer, sdlcheck bool) {
+ Rmr = NewRMRClient()
+ Rmr.SetReadyCB(xappReadyCb, nil)
go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
+ if sdlcheck {
+ Sdl.TestConnection()
+ }
+
+func Run(c MessageConsumer) {
+ RunWithParams(c, true)
+}