feat(API): Add subscription id and source retrieval
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
index 3592a7b..c7bd049 100644 (file)
@@ -139,6 +139,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
        if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
                hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
+               hdr->sub_id = htonl( UNSET_SUBID );
                SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
                SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
                //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
@@ -225,6 +226,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
                        msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
+                       msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
                        msg->state = RMR_OK;
                        hlen = sizeof( uta_v1mhdr_t );
                        break;
@@ -238,6 +240,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
                        msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
+                       msg->sub_id = ntohl( hdr->sub_id );
                        hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
                        break;
        }
@@ -290,6 +293,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
@@ -362,6 +366,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
@@ -444,7 +449,8 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->payload = NULL;
                msg->xaction = NULL;
                msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
@@ -480,7 +486,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->header = nng_msg_body( msg->tp_buf );
        msg->len = rsize;                                                       // len is the number of bytes received
        msg->alloc_len = rsize;
-       msg->mtype = -1;                                                        // raw message has no type
+       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
+       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
        msg->state = RMR_OK;
        msg->flags = MFL_RAW;
        msg->payload = msg->header;                                     // payload is the whole thing; no header
@@ -511,7 +518,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send