}
func (m *RMRClient) SendMsg(params *RMRParams) bool {
- return m.Send(params, false)
+ return m.SendBuffer(params, false)
}
func (m *RMRClient) SendRts(params *RMRParams) bool {
- return m.Send(params, true)
+ return m.SendBuffer(params, true)
}
-func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
- buf := params.Mbuf
- if buf == nil {
- buf = m.Allocate()
+func (m *RMRClient) SendBuffer(params *RMRParams, isRts bool) bool {
+ defer m.Free(params.Mbuf)
+ for i := 0; i < 10; i++ {
+ errCode := m.Send(params, isRts)
+ if errCode == C.RMR_OK {
+ m.UpdateStatCounter("Transmitted")
+ return true
+ }
+ if errCode != C.RMR_ERR_RETRY {
+ Logger.Error("rmrClient: rmr_send returned hard error - %d", errCode)
+ break
+ }
+
}
+ m.UpdateStatCounter("TransmitError")
+ return false
+}
- buf.mtype = C.int(params.Mtype)
- buf.sub_id = C.int(params.SubId)
- buf.len = C.int(len(params.Payload))
+func (m *RMRClient) Send(params *RMRParams, isRts bool) C.int {
+ txBuffer := params.Mbuf
+ if txBuffer == nil {
+ txBuffer = m.Allocate()
+ }
+
+ txBuffer.mtype = C.int(params.Mtype)
+ txBuffer.sub_id = C.int(params.SubId)
+ txBuffer.len = C.int(len(params.Payload))
if params.PayloadLen != 0 {
- buf.len = C.int(params.PayloadLen)
+ txBuffer.len = C.int(params.PayloadLen)
}
datap := C.CBytes(params.Payload)
defer C.free(datap)
if params.Meid != nil {
b := make([]byte, int(C.RMR_MAX_MEID))
copy(b, []byte(params.Meid.RanName))
- C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+ C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
}
xidLen := len(params.Xid)
if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
b := make([]byte, int(C.RMR_MAX_MEID))
copy(b, []byte(params.Xid))
- C.rmr_bytes2xact(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+ C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
}
}
- C.write_bytes_array(buf.payload, datap, buf.len)
-
- return m.SendBuf(buf, isRts)
-}
-
-func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
- defer C.rmr_free_msg(txBuffer)
- var currBuffer *C.rmr_mbuf_t
-
- for i := 0; i < 10; i++ {
- txBuffer.state = 0
- if isRts {
- currBuffer = C.rmr_rts_msg(m.context, txBuffer)
- } else {
- currBuffer = C.rmr_send_msg(m.context, txBuffer)
- }
-
- if currBuffer == nil {
- break
- } else if currBuffer.state != C.RMR_OK {
- if currBuffer.state != C.RMR_ERR_RETRY {
- time.Sleep(100 * time.Microsecond)
- m.UpdateStatCounter("TransmitError")
- }
- for j := 0; j < 100 && currBuffer.state == C.RMR_ERR_RETRY; j++ {
- currBuffer = C.rmr_send_msg(m.context, txBuffer)
- }
- }
+ C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
+
+ txBuffer.state = 0
+ currBuffer := txBuffer
+ if isRts {
+ currBuffer = C.rmr_rts_msg(m.context, txBuffer)
+ } else {
+ currBuffer = C.rmr_send_msg(m.context, txBuffer)
+ }
- if currBuffer.state == C.RMR_OK {
- m.UpdateStatCounter("Transmitted")
- return true
- }
+ if currBuffer != nil {
+ return currBuffer.state
}
- m.UpdateStatCounter("TransmitError")
- return false
+ return -1
}
func (m *RMRClient) UpdateStatCounter(name string) {