feat(API): Add subscription id and source retrieval 67/67/5
authorE. Scott Daniels <daniels@research.att.com>
Fri, 19 Apr 2019 21:12:25 +0000 (21:12 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Mon, 22 Apr 2019 14:37:09 +0000 (14:37 +0000)
Subscription id is needed for applications which wish to route
messages based on subscription rather than message type. This
field is now available to user applications by the sub_id
field in the message.  Subscription based applications may also
need to have access to the message source, so a get source
function has been added.

Change-Id: I796392a6e3899e8f01a3292796e54e7abd56c32f
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
CMakeLists.txt
doc/CMakeLists.txt
doc/src/man/rmr.7.xfm
doc/src/man/rmr_get_src.3.xfm [new file with mode: 0644]
src/common/include/rmr.h
src/common/include/rmr_agnostic.h
src/common/src/mbuf_api.c
src/nanomsg/src/sr_static.c
src/nng/src/sr_nng_static.c
test/mbuf_api_static_test.c

index 6de24c5..2782147 100644 (file)
@@ -24,7 +24,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "17" )
+set( patch_level "18" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 2dbca2b..ddd737f 100644 (file)
@@ -78,6 +78,7 @@ if( BUILD_DOC )
                rmr_set_trace.3
                rmr_tralloc_msg.3
                rmr_get_trlen.3
                rmr_set_trace.3
                rmr_tralloc_msg.3
                rmr_get_trlen.3
+               rmr_get_src.3
        )
 
        # empty list of roff/troff input files we generated
        )
 
        # empty list of roff/troff input files we generated
index eab9b9a..2ff5e6c 100644 (file)
@@ -1,6 +1,6 @@
 .if false
 ==================================================================================
 .if false
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
@@ -25,7 +25,7 @@
 
 .** if formatting with tfm, the roff.im will cause roff output to be generated
 .** and rst.im will cause rst to be generated depending on OUTPUT_TYPE env
 
 .** if formatting with tfm, the roff.im will cause roff output to be generated
 .** and rst.im will cause rst to be generated depending on OUTPUT_TYPE env
-.** var. 
+.** var.
 .** if formatting with pfm, then pretty postscript will be generated
 
 .gv e LIB lib
 .** if formatting with pfm, then pretty postscript will be generated
 
 .gv e LIB lib
@@ -33,7 +33,7 @@
        .im &{lib}/generic_ps.im
 .ei
        .gv e OUTPUT_RST use_rst
        .im &{lib}/generic_ps.im
 .ei
        .gv e OUTPUT_RST use_rst
-       .if .ev &use_rst 1 = 
+       .if .ev &use_rst 1 =
                .im &{lib}/rst.im
        .ei
                .im &{lib}/roff.im
                .im &{lib}/rst.im
        .ei
                .im &{lib}/roff.im
        RMr -- Ric Message Router Library
 
 &h2(DESCRIPTION)
        RMr -- Ric Message Router Library
 
 &h2(DESCRIPTION)
-RMr is a library which provides  a user application with the ability 
+RMr is a library which provides  a user application with the ability
 to send and receive messages to/from  other RMr based applications
 without having to understand the underlying messaging transport environment (e.g. Nanomsg)
 and without needing to know which other endpoint applications are currently
 available and accepting messages.
 To do this, RMr depends on a routing table generated by an external source.
 to send and receive messages to/from  other RMr based applications
 without having to understand the underlying messaging transport environment (e.g. Nanomsg)
 and without needing to know which other endpoint applications are currently
 available and accepting messages.
 To do this, RMr depends on a routing table generated by an external source.
-This table is used to determine the destination endpoint of each message sent by mapping the 
+This table is used to determine the destination endpoint of each message sent by mapping the
 message type T (supplied by the user application) to an endpoint entry.
 Once determined, the message is sent directly to the endpoint.
 The user application is unaware of which endpoint actually receives the
 message type T (supplied by the user application) to an endpoint entry.
 Once determined, the message is sent directly to the endpoint.
 The user application is unaware of which endpoint actually receives the
@@ -62,28 +62,28 @@ applications.
 
 &space
 RMr functions do provide for the ability to respond to the specific source
 
 &space
 RMr functions do provide for the ability to respond to the specific source
-instance of a message allowing for either a request response, or call 
-response relationship when needed.  
+instance of a message allowing for either a request response, or call
+response relationship when needed. 
 
 
 &h3(The Route Table)
 
 
 &h3(The Route Table)
-The library is supplied with a route table which maps message numbers to 
+The library is supplied with a route table which maps message numbers to
 endpoint groups such that each time a message of type T is sent, the message
 endpoint groups such that each time a message of type T is sent, the message
-is delivered to one member of each group associated with T. 
+is delivered to one member of each group associated with T.
 For example, message type 2 might route to two different groups where
 group A consists of worker1 and worker2, while group B consists only of
 logger1.
 
 &space
 It is the responsibility of the route table generator to know which endpoints
 For example, message type 2 might route to two different groups where
 group A consists of worker1 and worker2, while group B consists only of
 logger1.
 
 &space
 It is the responsibility of the route table generator to know which endpoints
-belong to which groups, and which groups accept which message types. 
+belong to which groups, and which groups accept which message types.
 Once understood, the route table generator publishes a table that is ingested
 by RMr and used for mapping messages to end points.
 
 &h3(Environment)
 Once understood, the route table generator publishes a table that is ingested
 by RMr and used for mapping messages to end points.
 
 &h3(Environment)
-To enable configuration of the library behaviour outside of direct user application 
-control, RMr supports a number of environment variables which provide information 
-to the library.  
+To enable configuration of the library behaviour outside of direct user application
+control, RMr supports a number of environment variables which provide information
+to the library. 
 The following is a list of the various environment variables, what they control
 and the defaults which RMr uses if undefined.
 
 The following is a list of the various environment variables, what they control
 and the defaults which RMr uses if undefined.
 
@@ -93,18 +93,18 @@ and the defaults which RMr uses if undefined.
        This should be the IP address assigned to the interface that RMr should listen
        on, and if not defined RMr will listen on all interfaces.
 
        This should be the IP address assigned to the interface that RMr should listen
        on, and if not defined RMr will listen on all interfaces.
 
-&di(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this 
+&di(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this
        environment variable and expects that the route table generator process
        environment variable and expects that the route table generator process
-       will connect to this port. 
+       will connect to this port.
        If not supplied the port 4561 is used.
 
 &di(RMR_RTG_ISRAW) Is set to 1 if the route table generator is sending "plain" messages
        (not using RMr to send messages, 0 if the rtg is using RMr to send. The default
        If not supplied the port 4561 is used.
 
 &di(RMR_RTG_ISRAW) Is set to 1 if the route table generator is sending "plain" messages
        (not using RMr to send messages, 0 if the rtg is using RMr to send. The default
-       is 1 as we don't expect the rtg to use RMr. 
+       is 1 as we don't expect the rtg to use RMr.
 
 
-&di(RMR_SEED_RT) This is used to supply a static route table which can be used for 
+&di(RMR_SEED_RT) This is used to supply a static route table which can be used for
        debugging, testing, or if no route table generator process is being used to
        debugging, testing, or if no route table generator process is being used to
-       supply the route table.  
+       supply the route table. 
        If not defined, no static table is used and RMr will not report &ital(ready)
        until a table is received.
 &end_dlist
        If not defined, no static table is used and RMr will not report &ital(ready)
        until a table is received.
 &end_dlist
@@ -118,6 +118,7 @@ rmr_call(3),
 rmr_free_msg(3),
 rmr_init(3),
 rmr_init_trace(3),
 rmr_free_msg(3),
 rmr_init(3),
 rmr_init_trace(3),
+rmr_get_src(3),
 rmr_get_trace(3),
 rmr_get_trlen(3),
 rmr_payload_size(3),
 rmr_get_trace(3),
 rmr_get_trlen(3),
 rmr_payload_size(3),
diff --git a/doc/src/man/rmr_get_src.3.xfm b/doc/src/man/rmr_get_src.3.xfm
new file mode 100644 (file)
index 0000000..5ea92f3
--- /dev/null
@@ -0,0 +1,112 @@
+.if false
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+.fi
+
+.if false
+       Mnemonic        rmr_get_src.xfm
+       Abstract        The manual page for the rmr_get_src function.
+       Author          E. Scott Daniels
+       Date            8 March 2019
+.fi
+
+.** if formatting with tfm, the roff.im will cause roff output to be generated
+.** if formatting with pfm, then pretty postscript will be generated
+.gv e LIB lib
+.if pfm
+       .im &{lib}/generic_ps.im
+.ei
+       .gv e OUTPUT_RST use_rst
+       .if .ev &use_rst 1 =
+               .im &{lib}/rst.im
+       .ei
+               .im &{lib}/roff.im
+       .fi
+.fi
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+       rmr_get_src
+
+&h2(SYNOPSIS)
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+unsigned char* rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest )
+&ex_end
+
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_get_src) function will copy the &ital(source) information from the message to a buffer
+(dest) supplied by the user.
+In an RMr message, the source is the sender's information that is used for return to sender
+function calls, and is generally the hostname and port in the form &ital(name:port).
+The source might be an IP address port combination; the data is populated by the sending process
+and the only requirement is that it be capable of being used to start a TCP session with the
+sender.
+.sp
+
+The maximum size allowed by RMr is 64 bytes (including the nil string terminator), so the user
+must ensure that the destination buffer given is at least 64 bytes.
+
+&h2(RETURN VALUE)
+On success, a pointer to the destination buffer is given as a convenience to the user programme.
+On failure, a nil pointer is returned and the value of errno is set.
+
+&h2(ERRORS)
+If an error occurs, the value of the global variable &cw( errno ) will be set to one of
+the following with the indicated meaning.
+
+&beg_dlist(.75i : ^&bold_font )
+&half_space
+&di(EINVAL) The message, or an internal portion of the message, was corrupted or the pointer was invalid.
+&end_dilist
+
+
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_bytes2xact(3),
+rmr_bytes2meid(3),
+rmr_call(3),
+rmr_free_msg(3),
+rmr_get_rcvfd(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_rcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_fib(3),
+rmr_has_str(3),
+rmr_tokenise(3),
+rmr_mk_ring(3),
+rmr_ring_free(3),
+rmr_str2meid(3),
+rmr_str2xact(3),
+rmr_wh_open(3),
+rmr_wh_send_msg(3)
+.ju on
+
+
+.qu
+
index 15c8400..4e662ef 100644 (file)
@@ -74,11 +74,12 @@ extern "C" {
        into or out of their environment they dup it all, not just what we choose to expose.)
 */
 typedef struct {
        into or out of their environment they dup it all, not just what we choose to expose.)
 */
 typedef struct {
-       int             state;                          // state of processing
+       int     state;                                  // state of processing
        int     mtype;                                  // message type
        int     len;                                    // length of data in the payload (send or received)
        unsigned char* payload;         // transported data
        unsigned char* xaction;         // pointer to fixed length transaction id bytes
        int     mtype;                                  // message type
        int     len;                                    // length of data in the payload (send or received)
        unsigned char* payload;         // transported data
        unsigned char* xaction;         // pointer to fixed length transaction id bytes
+       int sub_id;                                     // subscription id
 
                                                                // these things are off limits to the user application
        void*   tp_buf;                         // underlying transport allocated pointer (e.g. nng message)    
 
                                                                // these things are off limits to the user application
        void*   tp_buf;                         // underlying transport allocated pointer (e.g. nng message)    
@@ -121,6 +122,7 @@ extern void rmr_bytes2payload( rmr_mbuf_t* mbuf, unsigned char const* src, int l
 extern int rmr_bytes2xact( rmr_mbuf_t* mbuf, unsigned char const* src, int len );
 extern void rmr_free_msg( rmr_mbuf_t* mbuf );
 extern unsigned char*  rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest );
 extern int rmr_bytes2xact( rmr_mbuf_t* mbuf, unsigned char const* src, int len );
 extern void rmr_free_msg( rmr_mbuf_t* mbuf );
 extern unsigned char*  rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest );
+extern unsigned char*  rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest );
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* mbuf, int new_tr_size );
 extern int rmr_str2meid( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* mbuf, int new_tr_size );
 extern int rmr_str2meid( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
index c28ccb9..c25a535 100644 (file)
@@ -72,7 +72,10 @@ typedef struct uta_ctx  uta_ctx_t;
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
-#define DEF_TR_LEN             -1                              // use default trace data len from context
+#define DEF_TR_LEN             (-1)                    // use default trace data len from context
+
+#define UNSET_SUBID            (-1)                    // initial value on msg allocation indicating not set
+#define UNSET_MSGTYPE  (-1)
 
 // -- header length/offset macros must ensure network conversion ----
 #define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
 
 // -- header length/offset macros must ensure network conversion ----
 #define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
@@ -134,7 +137,7 @@ typedef struct {
        int32_t len1;                                           // length of the tracing data
        int32_t len2;                                           // length of data 1 (d1)
        int32_t len3;                                           // length of data 2 (d2)
        int32_t len1;                                           // length of the tracing data
        int32_t len2;                                           // length of data 1 (d1)
        int32_t len3;                                           // length of data 2 (d2)
-
+       int32_t sub_id;                                         // subscription id (-1 invalid)
 } uta_mhdr_t;
 
 
 } uta_mhdr_t;
 
 
index ef0c28f..4b6d218 100644 (file)
@@ -316,3 +316,28 @@ extern int rmr_get_trlen( rmr_mbuf_t* msg ) {
 
        return RMR_TR_LEN( hdr );
 }
 
        return RMR_TR_LEN( hdr );
 }
+
+/*
+       Returns the string in the source portion of the header. This is assumed to be
+       something that can be used for direct sends (hostname:port). Regardless, it
+       will be a nil terminated, ascii string with max of 64 characters including
+       the final nil. So, the user must ensure that dest is at least 64 bytes.
+
+       As a convenience, the pointer to dest is returned on success; nil on failure
+       with errno set.
+*/
+extern unsigned char* rmr_get_src( rmr_mbuf_t* msg, unsigned char* dest ) {
+       uta_mhdr_t*     hdr = NULL;
+
+       if( msg == NULL ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       if( dest != NULL ) {
+               hdr = msg->header;
+               strcpy( dest, hdr->src );
+       }
+
+       return dest;
+}
index 0f59da2..1c2bec8 100644 (file)
@@ -100,6 +100,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
 
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
+       hdr->sub_id = htonl( UNSET_SUBID );
        SET_HDR_LEN( hdr );
        SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
        //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
        SET_HDR_LEN( hdr );
        SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
        //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
@@ -145,6 +146,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
 
        nm->mtype = old_msg->mtype;
        memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
 
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
        nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
        nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
@@ -208,6 +210,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
@@ -249,6 +252,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                        msg->len = msg->state - RMR_HDR_LEN( hdr );
                }
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
                        msg->len = msg->state - RMR_HDR_LEN( hdr );
                }
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
+               msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
                msg->payload = PAYLOAD_ADDR( msg->header );
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
                msg->payload = PAYLOAD_ADDR( msg->header );
@@ -284,7 +288,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
        if( msg->state >= 0 ) {
                msg->xaction = NULL;
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
        if( msg->state >= 0 ) {
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
                msg->len = msg->state;                                                                          // no header; len is the entire thing received
                msg->state = RMR_OK;
                msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
                msg->len = msg->state;                                                                          // no header; len is the entire thing received
                msg->state = RMR_OK;
                msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
@@ -295,7 +300,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->state = RMR_ERR_EMPTY;
                msg->payload = NULL;
                msg->xaction = NULL;
                msg->state = RMR_ERR_EMPTY;
                msg->payload = NULL;
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
        }
 
        return msg;
@@ -318,7 +324,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
        hdr->plen = htonl( msg->len );
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
index 3592a7b..c7bd049 100644 (file)
@@ -139,6 +139,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
        if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
                hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
        if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
                hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
+               hdr->sub_id = htonl( UNSET_SUBID );
                SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
                SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
                //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
                SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
                SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
                //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
@@ -225,6 +226,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
                        msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
                        msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
+                       msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
                        msg->state = RMR_OK;
                        hlen = sizeof( uta_v1mhdr_t );
                        break;
                        msg->state = RMR_OK;
                        hlen = sizeof( uta_v1mhdr_t );
                        break;
@@ -238,6 +240,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
                        msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
                        msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
+                       msg->sub_id = ntohl( hdr->sub_id );
                        hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
                        break;
        }
                        hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
                        break;
        }
@@ -290,6 +293,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
@@ -362,6 +366,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
                
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
@@ -444,7 +449,8 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->payload = NULL;
                msg->xaction = NULL;
                msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
                msg->payload = NULL;
                msg->xaction = NULL;
                msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
        }
 
        return msg;
@@ -480,7 +486,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->header = nng_msg_body( msg->tp_buf );
        msg->len = rsize;                                                       // len is the number of bytes received
        msg->alloc_len = rsize;
        msg->header = nng_msg_body( msg->tp_buf );
        msg->len = rsize;                                                       // len is the number of bytes received
        msg->alloc_len = rsize;
-       msg->mtype = -1;                                                        // raw message has no type
+       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
+       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
        msg->state = RMR_OK;
        msg->flags = MFL_RAW;
        msg->payload = msg->header;                                     // payload is the whole thing; no header
        msg->state = RMR_OK;
        msg->flags = MFL_RAW;
        msg->payload = msg->header;                                     // payload is the whole thing; no header
@@ -511,7 +518,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        hdr->plen = htonl( msg->len );
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
index 8598136..f232b3e 100644 (file)
@@ -1,7 +1,7 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-        Copyright (c) 2019 Nokia 
+        Copyright (c) 2019 Nokia
         Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
         Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,7 +21,7 @@
 /*
        Mmemonic:       mbuf_api_static_test.c
        Abstract:       Test the message buffer  funcitons. These are meant to be included at compile
 /*
        Mmemonic:       mbuf_api_static_test.c
        Abstract:       Test the message buffer  funcitons. These are meant to be included at compile
-                               time by the test driver.  
+                               time by the test driver.
 
        Author:         E. Scott Daniels
        Date:           3 April 2019
 
        Author:         E. Scott Daniels
        Date:           3 April 2019
@@ -44,6 +44,7 @@ int mbuf_api_test( ) {
        int i;
        int state;
        int errors = 0;
        int i;
        int state;
        int errors = 0;
+       char*   buf;
        rmr_mbuf_t*     mbuf;
        unsigned char src_buf[256];
 
        rmr_mbuf_t*     mbuf;
        unsigned char src_buf[256];
 
@@ -131,7 +132,7 @@ int mbuf_api_test( ) {
        errno = 0;
        c = rmr_get_meid( NULL, NULL );
        errors += fail_if( c != NULL, "get meid with nil message buffer" );
        errno = 0;
        c = rmr_get_meid( NULL, NULL );
        errors += fail_if( c != NULL, "get meid with nil message buffer" );
-       errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" ); 
+       errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" );
        
        c = rmr_get_meid( mbuf, NULL );                 // should allocate and return c
        errors += fail_if( c == NULL, "get meid with nil dest pointer (did not allocate a buffer)" );
        
        c = rmr_get_meid( mbuf, NULL );                 // should allocate and return c
        errors += fail_if( c == NULL, "get meid with nil dest pointer (did not allocate a buffer)" );
@@ -248,6 +249,20 @@ int mbuf_api_test( ) {
        errors+= fail_not_equal( state, 0, "compare of pulled trace info did not match" );
 
        i = rmr_get_trlen( mbuf );
        errors+= fail_not_equal( state, 0, "compare of pulled trace info did not match" );
 
        i = rmr_get_trlen( mbuf );
+
+
+       // ------------- source field tests ------------------------------------------------------------
+       // we cannot force anything into the message source field, so no way to test the content, but we
+       // can test pointers and expected nils
+
+       buf = rmr_get_src( NULL, src_buf );                                     // coverage test for nil msg check
+       errors += fail_not_nil( buf, "rmr_get_src returned a pointer when given a nil message" );
+
+       buf = rmr_get_src( mbuf, NULL );                                        // coverage test for nil dst check
+       errors += fail_not_nil( buf, "rmr_get_src returned a pointer when given a nil dest buffer" );
+
+       buf = rmr_get_src( mbuf, src_buf );
+       errors += fail_not_equal( buf, src_buf, "rmr_get_src didn't return expexted buffer pointer" );
        
 
        return errors > 0;                      // overall exit code bad if errors
        
 
        return errors > 0;                      // overall exit code bad if errors