import "C"
import (
+ "bytes"
+ "crypto/md5"
"fmt"
"github.com/spf13/viper"
"strings"
C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
}
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
type RMRParams struct {
Mtype int
Payload []byte
status int
}
+func (params *RMRParams) String() string {
+ var b bytes.Buffer
+ fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
+ return b.String()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
p := C.CString(protPort)
m := C.int(maxSize)
return m.Send(params, true)
}
+func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
+ status := m.Send(params, isRts)
+ i := 0
+ for ; i < int(to)*2 && status == false; i++ {
+ status = m.Send(params, isRts)
+ if status == false {
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+ if status == false {
+ err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
+ if params.Mbuf != nil {
+ m.Free(params.Mbuf)
+ params.Mbuf = nil
+ }
+ }
+ return
+}
+
func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
if params.Mbuf != nil {
m.Free(params.Mbuf)