SubId int
Src string
Mbuf *C.rmr_mbuf_t
+ status int
}
func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
}
C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
- return m.SendBuf(txBuffer, isRts)
+ params.status = m.SendBuf(txBuffer, isRts)
+ if params.status == int(C.RMR_OK) {
+ return true
+ }
+ return false
}
-func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
+func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
var (
currBuffer *C.rmr_mbuf_t
- state bool = true
counterName string = "Transmitted"
)
}
// Just quick retry seems to help for K8s issue
- for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
+ maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
+ if maxRetryOnFailure == 0 {
+ maxRetryOnFailure = 5
+ }
+
+ for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
if isRts {
currBuffer = C.rmr_rts_msg(m.context, currBuffer)
} else {
if currBuffer.state != C.RMR_OK {
counterName = "TransmitError"
- state = m.LogMBufError("SendBuf failed", currBuffer)
+ m.LogMBufError("SendBuf failed", currBuffer)
}
m.UpdateStatCounter(counterName)
m.Free(currBuffer)
- return state
+
+ return int(currBuffer.state)
+}
+
+func (m *RMRClient) IsRetryError(params *RMRParams) bool {
+ if params.status == int(C.RMR_ERR_RETRY) {
+ return true
+ }
+ return false
+}
+
+func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
+ if params.status == int(C.RMR_ERR_NOENDPT) {
+ return true
+ }
+ return false
}
func (m *RMRClient) UpdateStatCounter(name string) {
return
}
-func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
+func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
- return false
+ return int(mbuf.state)
}
// To be removed ...