Some code and testing improvement
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index 3a52c97..a25b340 100755 (executable)
@@ -81,6 +81,7 @@ type RMRParams struct {
        SubId      int
        Src        string
        Mbuf       *C.rmr_mbuf_t
+       status     int
 }
 
 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
@@ -247,13 +248,16 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
        }
        C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
 
-       return m.SendBuf(txBuffer, isRts)
+       params.status = m.SendBuf(txBuffer, isRts)
+       if params.status == int(C.RMR_OK) {
+               return true
+       }
+       return false
 }
 
-func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
+func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
        var (
                currBuffer  *C.rmr_mbuf_t
-               state       bool   = true
                counterName string = "Transmitted"
        )
 
@@ -270,7 +274,12 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
        }
 
        // Just quick retry seems to help for K8s issue
-       for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
+       maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
+       if maxRetryOnFailure == 0 {
+               maxRetryOnFailure = 5
+       }
+
+       for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
                if isRts {
                        currBuffer = C.rmr_rts_msg(m.context, currBuffer)
                } else {
@@ -280,12 +289,27 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
 
        if currBuffer.state != C.RMR_OK {
                counterName = "TransmitError"
-               state = m.LogMBufError("SendBuf failed", currBuffer)
+               m.LogMBufError("SendBuf failed", currBuffer)
        }
 
        m.UpdateStatCounter(counterName)
        m.Free(currBuffer)
-       return state
+
+       return int(currBuffer.state)
+}
+
+func (m *RMRClient) IsRetryError(params *RMRParams) bool {
+       if params.status == int(C.RMR_ERR_RETRY) {
+               return true
+       }
+       return false
+}
+
+func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
+       if params.status == int(C.RMR_ERR_NOENDPT) {
+               return true
+       }
+       return false
 }
 
 func (m *RMRClient) UpdateStatCounter(name string) {
@@ -325,9 +349,9 @@ func (m *RMRClient) GetRicMessageName(id int) (s string) {
        return
 }
 
-func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
+func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
        Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
-       return false
+       return int(mbuf.state)
 }
 
 // To be removed ...