Reduce the RMR sending retry 51/1151/1 v0.0.19
authorMohamed Abukar <abukar.mohamed@nokia.com>
Tue, 15 Oct 2019 06:58:17 +0000 (09:58 +0300)
committerMohamed Abukar <abukar.mohamed@nokia.com>
Tue, 15 Oct 2019 06:58:24 +0000 (09:58 +0300)
Change-Id: I24f3e52edc4c17657168ff1b6e82f867e7b854c8
Signed-off-by: Mohamed Abukar <abukar.mohamed@nokia.com>
pkg/xapp/rmr.go

index c7b0ff4..07ce55b 100755 (executable)
@@ -176,34 +176,14 @@ func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
 }
 
 func (m *RMRClient) SendMsg(params *RMRParams) bool {
-       return m.SendBuffer(params, false)
+       return m.Send(params, false)
 }
 
 func (m *RMRClient) SendRts(params *RMRParams) bool {
-       return m.SendBuffer(params, true)
+       return m.Send(params, true)
 }
 
-func (m *RMRClient) SendBuffer(params *RMRParams, isRts bool) bool {
-       for i := 0; i < 10; i++ {
-               errCode := m.Send(params, isRts)
-               if errCode == C.RMR_OK {
-                       m.Free(params.Mbuf)
-                       m.UpdateStatCounter("Transmitted")
-                       return true
-               }
-               if errCode != C.RMR_ERR_RETRY {
-                       Logger.Error("rmrClient: rmr_send returned hard error - %d", errCode)
-                       break
-               }
-
-       }
-
-       m.Free(params.Mbuf)
-       m.UpdateStatCounter("TransmitError")
-       return false
-}
-
-func (m *RMRClient) Send(params *RMRParams, isRts bool) C.int {
+func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
        txBuffer := params.Mbuf
        if txBuffer == nil {
                txBuffer = m.Allocate()
@@ -233,18 +213,45 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) C.int {
        }
        C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
 
+       return m.SendBuf(txBuffer, isRts)
+}
+
+func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
+       var (
+               currBuffer  *C.rmr_mbuf_t
+               state       bool   = true
+               counterName string = "Transmitted"
+       )
+
        txBuffer.state = 0
-       currBuffer := txBuffer
        if isRts {
                currBuffer = C.rmr_rts_msg(m.context, txBuffer)
        } else {
                currBuffer = C.rmr_send_msg(m.context, txBuffer)
        }
 
-       if currBuffer != nil {
-               return currBuffer.state
+       if currBuffer == nil {
+               m.UpdateStatCounter("TransmitError")
+               return false
+       }
+
+       // Just quick retry seems to help for K8s issue
+       for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
+               if isRts {
+                       currBuffer = C.rmr_rts_msg(m.context, currBuffer)
+               } else {
+                       currBuffer = C.rmr_send_msg(m.context, currBuffer)
+               }
+       }
+
+       if currBuffer.state != C.RMR_OK {
+               state = false
+               counterName = "TransmitError"
        }
-       return -1
+
+       m.UpdateStatCounter(counterName)
+       m.Free(currBuffer)
+       return state
 }
 
 func (m *RMRClient) UpdateStatCounter(name string) {