feat(API): Add subscription id and source retrieval
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
index 0f59da2..1c2bec8 100644 (file)
@@ -100,6 +100,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
+       hdr->sub_id = htonl( UNSET_SUBID );
        SET_HDR_LEN( hdr );
        SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
        //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
@@ -145,6 +146,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
 
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
        nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
@@ -208,6 +210,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
@@ -249,6 +252,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                        msg->len = msg->state - RMR_HDR_LEN( hdr );
                }
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
+               msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
                msg->payload = PAYLOAD_ADDR( msg->header );
@@ -284,7 +288,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
        if( msg->state >= 0 ) {
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
                msg->len = msg->state;                                                                          // no header; len is the entire thing received
                msg->state = RMR_OK;
                msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
@@ -295,7 +300,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->state = RMR_ERR_EMPTY;
                msg->payload = NULL;
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
@@ -318,7 +324,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source