X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Frmr.go;h=59df7436cf6c16623a3de18cbfbb56fc6695d1d5;hb=9c523a68520e552d051ca922815aa936c8611894;hp=9342828c8482ef2174399bfb4c090d302fc178fa;hpb=dba83dfb6f135fb2f6987e55beec8552c6a8b74e;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 9342828..59df743 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -29,6 +29,7 @@ package xapp #include #include + void write_bytes_array(unsigned char *dst, void *data, int len) { memcpy((void *)dst, (void *)data, len); } @@ -76,9 +77,16 @@ import ( var RMRCounterOpts = []CounterOpts{ {Name: "Transmitted", Help: "The total number of transmited RMR messages"}, - {Name: "Received", Help: "The total number of received RMR messages"}, {Name: "TransmitError", Help: "The total number of RMR transmission errors"}, + {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"}, + {Name: "Received", Help: "The total number of received RMR messages"}, {Name: "ReceiveError", Help: "The total number of RMR receive errors"}, + {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"}, +} + +var RMRGaugeOpts = []CounterOpts{ + {Name: "Enqueued", Help: "The total number of enqueued in RMR library"}, + {Name: "Dropped", Help: "The total number of dropped in RMR library"}, } var RMRErrors = map[int]string{ @@ -101,9 +109,9 @@ var RMRErrors = map[int]string{ C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request", } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- type RMRParams struct { Mtype int Payload []byte @@ -125,9 +133,9 @@ func (params *RMRParams) String() string { return b.String() } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- type RMRClientParams struct { StatDesc string RmrData PortData @@ -139,9 +147,9 @@ func (params *RMRClientParams) String() string { params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies) } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { p := C.CString(fmt.Sprintf("%d", params.RmrData.Port)) m := C.int(params.RmrData.MaxSize) @@ -164,7 +172,8 @@ func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { return &RMRClient{ context: ctx, consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc), + statc: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc), + statg: Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc), maxRetryOnFailure: params.RmrData.MaxRetryOnFailure, } } @@ -208,12 +217,12 @@ func (m *RMRClient) Start(c MessageConsumer) { time.Sleep(1 * time.Second) counter++ } - m.wg.Add(1) if m.readyCb != nil { go m.readyCb(m.readyCbParams) } + m.wg.Add(1) go func() { m.contextMux.Lock() rfd := C.rmr_get_rcvfd(m.context) @@ -222,6 +231,7 @@ func (m *RMRClient) Start(c MessageConsumer) { defer m.wg.Done() for { + if int(C.wait_epoll(efd, rfd)) == 0 { continue } @@ -239,9 +249,30 @@ func (m *RMRClient) Start(c MessageConsumer) { } }() + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + m.UpdateRmrStats() + time.Sleep(1 * time.Second) + } + }() + m.wg.Wait() } +func (m *RMRClient) UpdateRmrStats() { + param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{})))) + m.contextMux.Lock() + C.rmr_get_rx_debug_info(m.context, param) + m.contextMux.Unlock() + m.mux.Lock() + m.statg["Enqueued"].Set(float64(param.enqueue)) + m.statg["Dropped"].Set(float64(param.drop)) + m.mux.Unlock() + C.free(unsafe.Pointer(param)) +} + func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { if len(m.consumers) == 0 { Logger.Info("rmrClient: No message handlers defined, message discarded!") @@ -340,6 +371,7 @@ func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duratio for ; i < int(to)*2 && status == false; i++ { status = m.Send(params, isRts) if status == false { + m.UpdateStatCounter("SendWithRetryRetry") time.Sleep(500 * time.Millisecond) } } @@ -415,61 +447,46 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { } func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { - var ( - currBuffer *C.rmr_mbuf_t - ) - - m.contextMux.Lock() txBuffer.state = 0 - if whid != 0 { - currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) - } else { - if isRts { - currBuffer = C.rmr_rts_msg(m.context, txBuffer) - } else { - currBuffer = C.rmr_send_msg(m.context, txBuffer) - } - } - m.contextMux.Unlock() - - if currBuffer == nil { - m.UpdateStatCounter("TransmitError") - return m.LogMBufError("SendBuf failed", txBuffer) - } // Just quick retry seems to help for K8s issue if m.maxRetryOnFailure == 0 { m.maxRetryOnFailure = 5 } - for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { + for j := 0; j <= m.maxRetryOnFailure; j++ { m.contextMux.Lock() if whid != 0 { - currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) + txBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) } else { if isRts { - currBuffer = C.rmr_rts_msg(m.context, txBuffer) + txBuffer = C.rmr_rts_msg(m.context, txBuffer) } else { - currBuffer = C.rmr_send_msg(m.context, txBuffer) + txBuffer = C.rmr_send_msg(m.context, txBuffer) } } m.contextMux.Unlock() + if j+1 <= m.maxRetryOnFailure && txBuffer != nil && txBuffer.state == C.RMR_ERR_RETRY { + m.UpdateStatCounter("TransmitRetry") + continue + } + break } - if currBuffer == nil { + if txBuffer == nil { m.UpdateStatCounter("TransmitError") - m.LogMBufError("SendBuf failed", currBuffer) + m.LogMBufError("SendBuf failed", txBuffer) return int(C.RMR_ERR_INITFAILED) } - if currBuffer.state != C.RMR_OK { + if txBuffer.state != C.RMR_OK { m.UpdateStatCounter("TransmitError") - m.LogMBufError("SendBuf failed", currBuffer) + m.LogMBufError("SendBuf failed", txBuffer) } else { m.UpdateStatCounter("Transmitted") } - defer m.Free(currBuffer) - return int(currBuffer.state) + defer m.Free(txBuffer) + return int(txBuffer.state) } @@ -545,12 +562,13 @@ func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool { func (m *RMRClient) UpdateStatCounter(name string) { m.mux.Lock() - m.stat[name].Inc() + m.statc[name].Inc() m.mux.Unlock() } func (m *RMRClient) RegisterMetrics() { - m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") + m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") + m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR") } func (m *RMRClient) Wait() {