From d5dc8e1377fbccfd25fef3a57fb19a6fb76c95bd Mon Sep 17 00:00:00 2001 From: Mohamed Abukar Date: Tue, 15 Oct 2019 09:58:17 +0300 Subject: [PATCH] Reduce the RMR sending retry Change-Id: I24f3e52edc4c17657168ff1b6e82f867e7b854c8 Signed-off-by: Mohamed Abukar --- pkg/xapp/rmr.go | 61 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index c7b0ff4..07ce55b 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -176,34 +176,14 @@ func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) { } func (m *RMRClient) SendMsg(params *RMRParams) bool { - return m.SendBuffer(params, false) + return m.Send(params, false) } func (m *RMRClient) SendRts(params *RMRParams) bool { - return m.SendBuffer(params, true) + return m.Send(params, true) } -func (m *RMRClient) SendBuffer(params *RMRParams, isRts bool) bool { - for i := 0; i < 10; i++ { - errCode := m.Send(params, isRts) - if errCode == C.RMR_OK { - m.Free(params.Mbuf) - m.UpdateStatCounter("Transmitted") - return true - } - if errCode != C.RMR_ERR_RETRY { - Logger.Error("rmrClient: rmr_send returned hard error - %d", errCode) - break - } - - } - - m.Free(params.Mbuf) - m.UpdateStatCounter("TransmitError") - return false -} - -func (m *RMRClient) Send(params *RMRParams, isRts bool) C.int { +func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { txBuffer := params.Mbuf if txBuffer == nil { txBuffer = m.Allocate() @@ -233,18 +213,45 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) C.int { } C.write_bytes_array(txBuffer.payload, datap, txBuffer.len) + return m.SendBuf(txBuffer, isRts) +} + +func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool { + var ( + currBuffer *C.rmr_mbuf_t + state bool = true + counterName string = "Transmitted" + ) + txBuffer.state = 0 - currBuffer := txBuffer if isRts { currBuffer = C.rmr_rts_msg(m.context, txBuffer) } else { currBuffer = C.rmr_send_msg(m.context, txBuffer) } - if currBuffer != nil { - return currBuffer.state + if currBuffer == nil { + m.UpdateStatCounter("TransmitError") + return false + } + + // Just quick retry seems to help for K8s issue + for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { + if isRts { + currBuffer = C.rmr_rts_msg(m.context, currBuffer) + } else { + currBuffer = C.rmr_send_msg(m.context, currBuffer) + } + } + + if currBuffer.state != C.RMR_OK { + state = false + counterName = "TransmitError" } - return -1 + + m.UpdateStatCounter(counterName) + m.Free(currBuffer) + return state } func (m *RMRClient) UpdateStatCounter(name string) { -- 2.16.6