}
func (m *RMRClient) SendMsg(params *RMRParams) bool {
- return m.SendBuffer(params, false)
+ return m.Send(params, false)
}
func (m *RMRClient) SendRts(params *RMRParams) bool {
- return m.SendBuffer(params, true)
+ return m.Send(params, true)
}
-func (m *RMRClient) SendBuffer(params *RMRParams, isRts bool) bool {
- for i := 0; i < 10; i++ {
- errCode := m.Send(params, isRts)
- if errCode == C.RMR_OK {
- m.Free(params.Mbuf)
- m.UpdateStatCounter("Transmitted")
- return true
- }
- if errCode != C.RMR_ERR_RETRY {
- Logger.Error("rmrClient: rmr_send returned hard error - %d", errCode)
- break
- }
-
- }
-
- m.Free(params.Mbuf)
- m.UpdateStatCounter("TransmitError")
- return false
-}
-
-func (m *RMRClient) Send(params *RMRParams, isRts bool) C.int {
+func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
txBuffer := params.Mbuf
if txBuffer == nil {
txBuffer = m.Allocate()
}
C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
+ return m.SendBuf(txBuffer, isRts)
+}
+
+func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
+ var (
+ currBuffer *C.rmr_mbuf_t
+ state bool = true
+ counterName string = "Transmitted"
+ )
+
txBuffer.state = 0
- currBuffer := txBuffer
if isRts {
currBuffer = C.rmr_rts_msg(m.context, txBuffer)
} else {
currBuffer = C.rmr_send_msg(m.context, txBuffer)
}
- if currBuffer != nil {
- return currBuffer.state
+ if currBuffer == nil {
+ m.UpdateStatCounter("TransmitError")
+ return false
+ }
+
+ // Just quick retry seems to help for K8s issue
+ for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
+ if isRts {
+ currBuffer = C.rmr_rts_msg(m.context, currBuffer)
+ } else {
+ currBuffer = C.rmr_send_msg(m.context, currBuffer)
+ }
+ }
+
+ if currBuffer.state != C.RMR_OK {
+ state = false
+ counterName = "TransmitError"
}
- return -1
+
+ m.UpdateStatCounter(counterName)
+ m.Free(currBuffer)
+ return state
}
func (m *RMRClient) UpdateStatCounter(name string) {