DCAPTERM messagetypes
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
index 0aeced2..d310b3a 100755 (executable)
@@ -155,11 +155,14 @@ func (m *RMRClient) Worker(taskName string, msgSize int) {
                }
                m.UpdateStatCounter("Received")
 
+               m.msgWg.Add(1)
                go m.parseMessage(rxBuffer)
+               m.msgWg.Wait()
        }
 }
 
 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
+       defer m.msgWg.Done()
        if len(m.consumers) == 0 {
                Logger.Info("rmrClient: No message handlers defined, message discarded!")
                return
@@ -212,8 +215,8 @@ func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
        }
 }
 
-func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
-       buf := C.rmr_alloc_msg(m.context, 0)
+func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
+       buf := C.rmr_alloc_msg(m.context, C.int(size))
        if buf == nil {
                Logger.Error("rmrClient: Allocating message buffer failed!")
        }
@@ -236,20 +239,24 @@ func (m *RMRClient) SendRts(params *RMRParams) bool {
 }
 
 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
-       txBuffer := params.Mbuf
-       if txBuffer == nil {
-               txBuffer = m.Allocate()
-               if txBuffer == nil {
-                       return nil
-               }
+       if params.Mbuf != nil {
+               m.Free(params.Mbuf)
+               params.Mbuf = nil
        }
 
-       txBuffer.mtype = C.int(params.Mtype)
-       txBuffer.sub_id = C.int(params.SubId)
-       txBuffer.len = C.int(len(params.Payload))
+       payLen := len(params.Payload)
        if params.PayloadLen != 0 {
-               txBuffer.len = C.int(params.PayloadLen)
+               payLen = params.PayloadLen
+       }
+
+       txBuffer := m.Allocate(payLen)
+       if txBuffer == nil {
+               return nil
        }
+       txBuffer.mtype = C.int(params.Mtype)
+       txBuffer.sub_id = C.int(params.SubId)
+       txBuffer.len = C.int(payLen)
+
        datap := C.CBytes(params.Payload)
        defer C.free(datap)