General capabilities for xapps
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index 90bc64b..9b91e1a 100755 (executable)
@@ -64,6 +64,8 @@ int wait_epoll(int epoll_fd,int rcv_fd) {
 import "C"
 
 import (
+       "bytes"
+       "crypto/md5"
        "fmt"
        "github.com/spf13/viper"
        "strings"
@@ -98,6 +100,9 @@ var RMRErrors = map[int]string{
        C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type RMRParams struct {
        Mtype      int
        Payload    []byte
@@ -113,6 +118,15 @@ type RMRParams struct {
        status     int
 }
 
+func (params *RMRParams) String() string {
+       var b bytes.Buffer
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
+       return b.String()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
        p := C.CString(protPort)
        m := C.int(maxSize)
@@ -268,6 +282,25 @@ func (m *RMRClient) SendRts(params *RMRParams) bool {
        return m.Send(params, true)
 }
 
+func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
+       status := m.Send(params, isRts)
+       i := 0
+       for ; i < int(to)*2 && status == false; i++ {
+               status = m.Send(params, isRts)
+               if status == false {
+                       time.Sleep(500 * time.Millisecond)
+               }
+       }
+       if status == false {
+               err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
+               if params.Mbuf != nil {
+                       m.Free(params.Mbuf)
+                       params.Mbuf = nil
+               }
+       }
+       return
+}
+
 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
        if params.Mbuf != nil {
                m.Free(params.Mbuf)