Preserve the message order
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index 44f0178..d310b3a 100755 (executable)
@@ -82,6 +82,8 @@ type RMRParams struct {
        Src        string
        Mbuf       *C.rmr_mbuf_t
        Whid       int
+       Callid     int
+       Timeout    int
        status     int
 }
 
@@ -153,11 +155,14 @@ func (m *RMRClient) Worker(taskName string, msgSize int) {
                }
                m.UpdateStatCounter("Received")
 
+               m.msgWg.Add(1)
                go m.parseMessage(rxBuffer)
+               m.msgWg.Wait()
        }
 }
 
 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
+       defer m.msgWg.Done()
        if len(m.consumers) == 0 {
                Logger.Info("rmrClient: No message handlers defined, message discarded!")
                return
@@ -210,8 +215,8 @@ func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
        }
 }
 
-func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
-       buf := C.rmr_alloc_msg(m.context, 0)
+func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
+       buf := C.rmr_alloc_msg(m.context, C.int(size))
        if buf == nil {
                Logger.Error("rmrClient: Allocating message buffer failed!")
        }
@@ -233,21 +238,25 @@ func (m *RMRClient) SendRts(params *RMRParams) bool {
        return m.Send(params, true)
 }
 
-func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
-       txBuffer := params.Mbuf
-       if txBuffer == nil {
-               txBuffer = m.Allocate()
-               if txBuffer == nil {
-                       return false
-               }
+func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
+       if params.Mbuf != nil {
+               m.Free(params.Mbuf)
+               params.Mbuf = nil
        }
 
-       txBuffer.mtype = C.int(params.Mtype)
-       txBuffer.sub_id = C.int(params.SubId)
-       txBuffer.len = C.int(len(params.Payload))
+       payLen := len(params.Payload)
        if params.PayloadLen != 0 {
-               txBuffer.len = C.int(params.PayloadLen)
+               payLen = params.PayloadLen
        }
+
+       txBuffer := m.Allocate(payLen)
+       if txBuffer == nil {
+               return nil
+       }
+       txBuffer.mtype = C.int(params.Mtype)
+       txBuffer.sub_id = C.int(params.SubId)
+       txBuffer.len = C.int(payLen)
+
        datap := C.CBytes(params.Payload)
        defer C.free(datap)
 
@@ -265,7 +274,15 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
                }
        }
        C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
+       return txBuffer
+}
+
+func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
 
+       txBuffer := m.CopyBuffer(params)
+       if txBuffer == nil {
+               return false
+       }
        params.status = m.SendBuf(txBuffer, isRts, params.Whid)
        if params.status == int(C.RMR_OK) {
                return true
@@ -324,6 +341,39 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
        return int(currBuffer.state)
 }
 
+func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
+       var (
+               currBuffer  *C.rmr_mbuf_t
+               counterName string = "Transmitted"
+       )
+       txBuffer := m.CopyBuffer(params)
+       if txBuffer == nil {
+               return C.RMR_ERR_INITFAILED, ""
+       }
+
+       txBuffer.state = 0
+
+       currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
+
+       if currBuffer == nil {
+               m.UpdateStatCounter("TransmitError")
+               return m.LogMBufError("SendBuf failed", txBuffer), ""
+       }
+
+       if currBuffer.state != C.RMR_OK {
+               counterName = "TransmitError"
+               m.LogMBufError("SendBuf failed", currBuffer)
+       }
+
+       m.UpdateStatCounter(counterName)
+       defer m.Free(currBuffer)
+
+       cptr := unsafe.Pointer(currBuffer.payload)
+       payload := C.GoBytes(cptr, C.int(currBuffer.len))
+
+       return int(currBuffer.state), string(payload)
+}
+
 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
        return m.Wh_open(target)
 }