Mbuf *C.rmr_mbuf_t
}
-func NewRMRClient() *RMRClient {
- p := C.CString(viper.GetString("rmr.protPort"))
- m := C.int(viper.GetInt("rmr.maxSize"))
+func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
+ p := C.CString(protPort)
+ m := C.int(maxSize)
defer C.free(unsafe.Pointer(p))
ctx := C.rmr_init(p, m, C.int(0))
}
return &RMRClient{
- context: ctx,
- consumers: make([]MessageConsumer, 0),
- stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+ protPort: protPort,
+ numWorkers: numWorkers,
+ context: ctx,
+ consumers: make([]MessageConsumer, 0),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
}
}
+func NewRMRClient() *RMRClient {
+ return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
+}
+
func (m *RMRClient) Start(c MessageConsumer) {
if c != nil {
m.consumers = append(m.consumers, c)
}
time.Sleep(10 * time.Second)
}
- m.wg.Add(viper.GetInt("rmr.numWorkers"))
+ m.wg.Add(m.numWorkers)
if m.readyCb != nil {
go m.readyCb(m.readyCbParams)
}
- for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+ for w := 0; w < m.numWorkers; w++ {
go m.Worker("worker-"+strconv.Itoa(w), 0)
}
m.Wait()
}
func (m *RMRClient) Worker(taskName string, msgSize int) {
- p := viper.GetString("rmr.protPort")
- Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+ Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
defer m.wg.Done()
for {
txBuffer := params.Mbuf
if txBuffer == nil {
txBuffer = m.Allocate()
+ if txBuffer == nil {
+ return false
+ }
}
txBuffer.mtype = C.int(params.Mtype)
var (
// XApp is an application instance
- Rmr *RMRClient
- Sdl *SDLClient
- Rnib *RNIBClient
- Resource *Router
- Metric *Metrics
- Logger *Log
- Config Configurator
+ Rmr *RMRClient
+ Sdl *SDLClient
+ Rnib *RNIBClient
+ Resource *Router
+ Metric *Metrics
+ Logger *Log
+ Config Configurator
+ readyCb ReadyCB
+ readyCbParams interface{}
)
func IsReady() bool {
- return Rmr.IsReady() && Sdl.IsReady()
+ return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
}
func SetReadyCB(cb ReadyCB, params interface{}) {
- Rmr.SetReadyCB(cb, params)
+ readyCb = cb
+ readyCbParams = params
+}
+
+func xappReadyCb(params interface{}) {
+ if readyCb != nil {
+ readyCb(readyCbParams)
+ }
}
func init() {
Resource = NewRouter()
Config = Configurator{}
Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
- Rmr = NewRMRClient()
if viper.IsSet("db.namespaces") {
namespaces := viper.GetStringSlice("db.namespaces")
}
}
-func Run(c MessageConsumer) {
+func RunWithParams(c MessageConsumer, sdlcheck bool) {
+ Rmr = NewRMRClient()
+ Rmr.SetReadyCB(xappReadyCb, nil)
go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
-
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
-
- Sdl.TestConnection()
+ if sdlcheck {
+ Sdl.TestConnection()
+ }
Rmr.Start(c)
}
+
+func Run(c MessageConsumer) {
+ RunWithParams(c, true)
+}