X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Frmr.go;h=028cc3c9473aa18af2948b27409775063c729ddd;hb=79f0680fd7bbf1c8a8c6e2a842cb18020e387a47;hp=f2d09dacc63be5606904e2d58296be0ca4c3ec90;hpb=b8b191fe31602c53414f8086381aef1a48e8be09;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index f2d09da..028cc3c 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -70,13 +70,22 @@ import ( "strings" "time" "unsafe" + + "github.com/spf13/viper" ) var RMRCounterOpts = []CounterOpts{ {Name: "Transmitted", Help: "The total number of transmited RMR messages"}, - {Name: "Received", Help: "The total number of received RMR messages"}, {Name: "TransmitError", Help: "The total number of RMR transmission errors"}, + {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"}, + {Name: "Received", Help: "The total number of received RMR messages"}, {Name: "ReceiveError", Help: "The total number of RMR receive errors"}, + {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"}, +} + +var RMRGaugeOpts = []CounterOpts{ + {Name: "Enqueued", Help: "The total number of enqueued in RMR library"}, + {Name: "Dropped", Help: "The total number of dropped in RMR library"}, } var RMRErrors = map[int]string{ @@ -132,8 +141,9 @@ type RMRClientParams struct { } func (params *RMRClientParams) String() string { - return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t", - params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, params.RmrData.LowLatency, params.RmrData.FastAck) + return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v", + params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, + params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies) } //----------------------------------------------------------------------------- @@ -161,13 +171,24 @@ func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { return &RMRClient{ context: ctx, consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc), + statc: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc), + statg: Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc), maxRetryOnFailure: params.RmrData.MaxRetryOnFailure, } } func NewRMRClient() *RMRClient { - p := GetPortData("rmr-data") + p := GetPortData("rmrdata") + if p.Port == 0 || viper.IsSet("rmr.protPort") { + // Old xApp descriptor used, fallback to rmr section + fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port) + p.MaxSize = viper.GetInt("rmr.maxSize") + p.ThreadType = viper.GetInt("rmr.threadType") + p.LowLatency = viper.GetBool("rmr.lowLatency") + p.FastAck = viper.GetBool("rmr.fastAck") + p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure") + } + return NewRMRClientWithParams( &RMRClientParams{ RmrData: p, @@ -195,12 +216,12 @@ func (m *RMRClient) Start(c MessageConsumer) { time.Sleep(1 * time.Second) counter++ } - m.wg.Add(1) if m.readyCb != nil { go m.readyCb(m.readyCbParams) } + m.wg.Add(1) go func() { m.contextMux.Lock() rfd := C.rmr_get_rcvfd(m.context) @@ -209,6 +230,7 @@ func (m *RMRClient) Start(c MessageConsumer) { defer m.wg.Done() for { + if int(C.wait_epoll(efd, rfd)) == 0 { continue } @@ -226,9 +248,30 @@ func (m *RMRClient) Start(c MessageConsumer) { } }() + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + m.UpdateRmrStats() + time.Sleep(1 * time.Second) + } + }() + m.wg.Wait() } +func (m *RMRClient) UpdateRmrStats() { + param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{})))) + m.contextMux.Lock() + C.rmr_get_rx_debug_info(m.context, param) + m.contextMux.Unlock() + m.mux.Lock() + m.statg["Enqueued"].Set(float64(param.enqueue)) + m.statg["Dropped"].Set(float64(param.drop)) + m.mux.Unlock() + C.free(unsafe.Pointer(param)) +} + func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { if len(m.consumers) == 0 { Logger.Info("rmrClient: No message handlers defined, message discarded!") @@ -267,19 +310,21 @@ func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { return } - // Special case for multiple consumers - for _, c := range m.consumers { - cptr := unsafe.Pointer(rxBuffer.payload) - params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len)) - params.PayloadLen = int(rxBuffer.len) - params.Mtype = int(rxBuffer.mtype) - params.SubId = int(rxBuffer.sub_id) + /* + // Special case for multiple consumers + for _, c := range m.consumers { + cptr := unsafe.Pointer(rxBuffer.payload) + params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len)) + params.PayloadLen = int(rxBuffer.len) + params.Mtype = int(rxBuffer.mtype) + params.SubId = int(rxBuffer.sub_id) - err := c.Consume(params) - if err != nil { - Logger.Warn("rmrClient: Consumer returned error: %v", err) + err := c.Consume(params) + if err != nil { + Logger.Warn("rmrClient: Consumer returned error: %v", err) + } } - } + */ } func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t { @@ -325,6 +370,7 @@ func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duratio for ; i < int(to)*2 && status == false; i++ { status = m.Send(params, isRts) if status == false { + m.UpdateStatCounter("SendWithRetryRetry") time.Sleep(500 * time.Millisecond) } } @@ -439,6 +485,7 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { } } m.contextMux.Unlock() + m.UpdateStatCounter("TransmitRetry") } if currBuffer == nil { @@ -530,12 +577,13 @@ func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool { func (m *RMRClient) UpdateStatCounter(name string) { m.mux.Lock() - m.stat[name].Inc() + m.statc[name].Inc() m.mux.Unlock() } func (m *RMRClient) RegisterMetrics() { - m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") + m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") + m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR") } func (m *RMRClient) Wait() {