X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Frmr.go;h=59df7436cf6c16623a3de18cbfbb56fc6695d1d5;hb=refs%2Ftags%2Fv0.9.19;hp=028cc3c9473aa18af2948b27409775063c729ddd;hpb=79f0680fd7bbf1c8a8c6e2a842cb18020e387a47;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 028cc3c..59df743 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -29,6 +29,7 @@ package xapp #include #include + void write_bytes_array(unsigned char *dst, void *data, int len) { memcpy((void *)dst, (void *)data, len); } @@ -108,9 +109,9 @@ var RMRErrors = map[int]string{ C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request", } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- type RMRParams struct { Mtype int Payload []byte @@ -132,9 +133,9 @@ func (params *RMRParams) String() string { return b.String() } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- type RMRClientParams struct { StatDesc string RmrData PortData @@ -146,9 +147,9 @@ func (params *RMRClientParams) String() string { params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies) } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { p := C.CString(fmt.Sprintf("%d", params.RmrData.Port)) m := C.int(params.RmrData.MaxSize) @@ -446,62 +447,46 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { } func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { - var ( - currBuffer *C.rmr_mbuf_t - ) - - m.contextMux.Lock() txBuffer.state = 0 - if whid != 0 { - currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) - } else { - if isRts { - currBuffer = C.rmr_rts_msg(m.context, txBuffer) - } else { - currBuffer = C.rmr_send_msg(m.context, txBuffer) - } - } - m.contextMux.Unlock() - - if currBuffer == nil { - m.UpdateStatCounter("TransmitError") - return m.LogMBufError("SendBuf failed", txBuffer) - } // Just quick retry seems to help for K8s issue if m.maxRetryOnFailure == 0 { m.maxRetryOnFailure = 5 } - for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { + for j := 0; j <= m.maxRetryOnFailure; j++ { m.contextMux.Lock() if whid != 0 { - currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) + txBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) } else { if isRts { - currBuffer = C.rmr_rts_msg(m.context, txBuffer) + txBuffer = C.rmr_rts_msg(m.context, txBuffer) } else { - currBuffer = C.rmr_send_msg(m.context, txBuffer) + txBuffer = C.rmr_send_msg(m.context, txBuffer) } } m.contextMux.Unlock() - m.UpdateStatCounter("TransmitRetry") + if j+1 <= m.maxRetryOnFailure && txBuffer != nil && txBuffer.state == C.RMR_ERR_RETRY { + m.UpdateStatCounter("TransmitRetry") + continue + } + break } - if currBuffer == nil { + if txBuffer == nil { m.UpdateStatCounter("TransmitError") - m.LogMBufError("SendBuf failed", currBuffer) + m.LogMBufError("SendBuf failed", txBuffer) return int(C.RMR_ERR_INITFAILED) } - if currBuffer.state != C.RMR_OK { + if txBuffer.state != C.RMR_OK { m.UpdateStatCounter("TransmitError") - m.LogMBufError("SendBuf failed", currBuffer) + m.LogMBufError("SendBuf failed", txBuffer) } else { m.UpdateStatCounter("Transmitted") } - defer m.Free(currBuffer) - return int(currBuffer.state) + defer m.Free(txBuffer) + return int(txBuffer.state) }