import "C"
import (
+ "bytes"
+ "crypto/md5"
"fmt"
"github.com/spf13/viper"
"strings"
C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
}
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
type RMRParams struct {
Mtype int
Payload []byte
status int
}
+func (params *RMRParams) String() string {
+ var b bytes.Buffer
+ fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
+ return b.String()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
p := C.CString(protPort)
m := C.int(maxSize)
func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
m.contextMux.Lock()
defer m.contextMux.Unlock()
- buf := C.rmr_alloc_msg(m.context, C.int(size))
- if buf == nil {
+ outbuf := C.rmr_alloc_msg(m.context, C.int(size))
+ if outbuf == nil {
+ Logger.Error("rmrClient: Allocating message buffer failed!")
+ }
+ return outbuf
+}
+
+func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
+ m.contextMux.Lock()
+ defer m.contextMux.Unlock()
+ outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
+ if outbuf == nil {
Logger.Error("rmrClient: Allocating message buffer failed!")
}
- return buf
+ return outbuf
}
func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
return m.Send(params, true)
}
-func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
- if params.Mbuf != nil {
- m.Free(params.Mbuf)
- params.Mbuf = nil
+func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
+ status := m.Send(params, isRts)
+ i := 0
+ for ; i < int(to)*2 && status == false; i++ {
+ status = m.Send(params, isRts)
+ if status == false {
+ time.Sleep(500 * time.Millisecond)
+ }
}
+ if status == false {
+ err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
+ if params.Mbuf != nil {
+ m.Free(params.Mbuf)
+ params.Mbuf = nil
+ }
+ }
+ return
+}
+
+func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
payLen := len(params.Payload)
if params.PayloadLen != 0 {
payLen = params.PayloadLen
}
- txBuffer := m.Allocate(payLen)
+ txBuffer := params.Mbuf
+ params.Mbuf = nil
+
+ if txBuffer != nil {
+ txBuffer = m.ReAllocate(txBuffer, payLen)
+ } else {
+ txBuffer = m.Allocate(payLen)
+ }
+
if txBuffer == nil {
return nil
}