From c1aee2b63a523a461e96a8d358c73dd8a9e9e6a2 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Fri, 19 Apr 2019 21:12:25 +0000 Subject: [PATCH] feat(API): Add subscription id and source retrieval Subscription id is needed for applications which wish to route messages based on subscription rather than message type. This field is now available to user applications by the sub_id field in the message. Subscription based applications may also need to have access to the message source, so a get source function has been added. Change-Id: I796392a6e3899e8f01a3292796e54e7abd56c32f Signed-off-by: E. Scott Daniels --- CMakeLists.txt | 2 +- doc/CMakeLists.txt | 1 + doc/src/man/rmr.7.xfm | 37 +++++++------ doc/src/man/rmr_get_src.3.xfm | 112 ++++++++++++++++++++++++++++++++++++++ src/common/include/rmr.h | 4 +- src/common/include/rmr_agnostic.h | 7 ++- src/common/src/mbuf_api.c | 25 +++++++++ src/nanomsg/src/sr_static.c | 13 ++++- src/nng/src/sr_nng_static.c | 14 ++++- test/mbuf_api_static_test.c | 21 ++++++- 10 files changed, 205 insertions(+), 31 deletions(-) create mode 100644 doc/src/man/rmr_get_src.3.xfm diff --git a/CMakeLists.txt b/CMakeLists.txt index 6de24c5..2782147 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,7 +24,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "17" ) +set( patch_level "18" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index 2dbca2b..ddd737f 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -78,6 +78,7 @@ if( BUILD_DOC ) rmr_set_trace.3 rmr_tralloc_msg.3 rmr_get_trlen.3 + rmr_get_src.3 ) # empty list of roff/troff input files we generated diff --git a/doc/src/man/rmr.7.xfm b/doc/src/man/rmr.7.xfm index eab9b9a..2ff5e6c 100644 --- a/doc/src/man/rmr.7.xfm +++ b/doc/src/man/rmr.7.xfm @@ -1,6 +1,6 @@ .if false ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,7 +25,7 @@ .** if formatting with tfm, the roff.im will cause roff output to be generated .** and rst.im will cause rst to be generated depending on OUTPUT_TYPE env -.** var. +.** var. .** if formatting with pfm, then pretty postscript will be generated .gv e LIB lib @@ -33,7 +33,7 @@ .im &{lib}/generic_ps.im .ei .gv e OUTPUT_RST use_rst - .if .ev &use_rst 1 = + .if .ev &use_rst 1 = .im &{lib}/rst.im .ei .im &{lib}/roff.im @@ -47,13 +47,13 @@ RMr -- Ric Message Router Library &h2(DESCRIPTION) -RMr is a library which provides a user application with the ability +RMr is a library which provides a user application with the ability to send and receive messages to/from other RMr based applications without having to understand the underlying messaging transport environment (e.g. Nanomsg) and without needing to know which other endpoint applications are currently available and accepting messages. To do this, RMr depends on a routing table generated by an external source. -This table is used to determine the destination endpoint of each message sent by mapping the +This table is used to determine the destination endpoint of each message sent by mapping the message type T (supplied by the user application) to an endpoint entry. Once determined, the message is sent directly to the endpoint. The user application is unaware of which endpoint actually receives the @@ -62,28 +62,28 @@ applications. &space RMr functions do provide for the ability to respond to the specific source -instance of a message allowing for either a request response, or call -response relationship when needed. +instance of a message allowing for either a request response, or call +response relationship when needed. &h3(The Route Table) -The library is supplied with a route table which maps message numbers to +The library is supplied with a route table which maps message numbers to endpoint groups such that each time a message of type T is sent, the message -is delivered to one member of each group associated with T. +is delivered to one member of each group associated with T. For example, message type 2 might route to two different groups where group A consists of worker1 and worker2, while group B consists only of logger1. &space It is the responsibility of the route table generator to know which endpoints -belong to which groups, and which groups accept which message types. +belong to which groups, and which groups accept which message types. Once understood, the route table generator publishes a table that is ingested by RMr and used for mapping messages to end points. &h3(Environment) -To enable configuration of the library behaviour outside of direct user application -control, RMr supports a number of environment variables which provide information -to the library. +To enable configuration of the library behaviour outside of direct user application +control, RMr supports a number of environment variables which provide information +to the library. The following is a list of the various environment variables, what they control and the defaults which RMr uses if undefined. @@ -93,18 +93,18 @@ and the defaults which RMr uses if undefined. This should be the IP address assigned to the interface that RMr should listen on, and if not defined RMr will listen on all interfaces. -&di(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this +&di(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this environment variable and expects that the route table generator process - will connect to this port. + will connect to this port. If not supplied the port 4561 is used. &di(RMR_RTG_ISRAW) Is set to 1 if the route table generator is sending "plain" messages (not using RMr to send messages, 0 if the rtg is using RMr to send. The default - is 1 as we don't expect the rtg to use RMr. + is 1 as we don't expect the rtg to use RMr. -&di(RMR_SEED_RT) This is used to supply a static route table which can be used for +&di(RMR_SEED_RT) This is used to supply a static route table which can be used for debugging, testing, or if no route table generator process is being used to - supply the route table. + supply the route table. If not defined, no static table is used and RMr will not report &ital(ready) until a table is received. &end_dlist @@ -118,6 +118,7 @@ rmr_call(3), rmr_free_msg(3), rmr_init(3), rmr_init_trace(3), +rmr_get_src(3), rmr_get_trace(3), rmr_get_trlen(3), rmr_payload_size(3), diff --git a/doc/src/man/rmr_get_src.3.xfm b/doc/src/man/rmr_get_src.3.xfm new file mode 100644 index 0000000..5ea92f3 --- /dev/null +++ b/doc/src/man/rmr_get_src.3.xfm @@ -0,0 +1,112 @@ +.if false +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +.fi + +.if false + Mnemonic rmr_get_src.xfm + Abstract The manual page for the rmr_get_src function. + Author E. Scott Daniels + Date 8 March 2019 +.fi + +.** if formatting with tfm, the roff.im will cause roff output to be generated +.** if formatting with pfm, then pretty postscript will be generated +.gv e LIB lib +.if pfm + .im &{lib}/generic_ps.im +.ei + .gv e OUTPUT_RST use_rst + .if .ev &use_rst 1 = + .im &{lib}/rst.im + .ei + .im &{lib}/roff.im + .fi +.fi + +&line_len(6i) + +&h1(RMR Library Functions) +&h2(NAME) + rmr_get_src + +&h2(SYNOPSIS) +&indent +&ex_start +#include + +unsigned char* rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest ) +&ex_end + +&uindent + +&h2(DESCRIPTION) +The &cw(rmr_get_src) function will copy the &ital(source) information from the message to a buffer +(dest) supplied by the user. +In an RMr message, the source is the sender's information that is used for return to sender +function calls, and is generally the hostname and port in the form &ital(name:port). +The source might be an IP address port combination; the data is populated by the sending process +and the only requirement is that it be capable of being used to start a TCP session with the +sender. +.sp + +The maximum size allowed by RMr is 64 bytes (including the nil string terminator), so the user +must ensure that the destination buffer given is at least 64 bytes. + +&h2(RETURN VALUE) +On success, a pointer to the destination buffer is given as a convenience to the user programme. +On failure, a nil pointer is returned and the value of errno is set. + +&h2(ERRORS) +If an error occurs, the value of the global variable &cw( errno ) will be set to one of +the following with the indicated meaning. + +&beg_dlist(.75i : ^&bold_font ) +&half_space +&di(EINVAL) The message, or an internal portion of the message, was corrupted or the pointer was invalid. +&end_dilist + + + +&h2(SEE ALSO ) +.ju off +rmr_alloc_msg(3), +rmr_bytes2xact(3), +rmr_bytes2meid(3), +rmr_call(3), +rmr_free_msg(3), +rmr_get_rcvfd(3), +rmr_payload_size(3), +rmr_send_msg(3), +rmr_rcv_msg(3), +rmr_rcv_specific(3), +rmr_rts_msg(3), +rmr_ready(3), +rmr_fib(3), +rmr_has_str(3), +rmr_tokenise(3), +rmr_mk_ring(3), +rmr_ring_free(3), +rmr_str2meid(3), +rmr_str2xact(3), +rmr_wh_open(3), +rmr_wh_send_msg(3) +.ju on + + +.qu + diff --git a/src/common/include/rmr.h b/src/common/include/rmr.h index 15c8400..4e662ef 100644 --- a/src/common/include/rmr.h +++ b/src/common/include/rmr.h @@ -74,11 +74,12 @@ extern "C" { into or out of their environment they dup it all, not just what we choose to expose.) */ typedef struct { - int state; // state of processing + int state; // state of processing int mtype; // message type int len; // length of data in the payload (send or received) unsigned char* payload; // transported data unsigned char* xaction; // pointer to fixed length transaction id bytes + int sub_id; // subscription id // these things are off limits to the user application void* tp_buf; // underlying transport allocated pointer (e.g. nng message) @@ -121,6 +122,7 @@ extern void rmr_bytes2payload( rmr_mbuf_t* mbuf, unsigned char const* src, int l extern int rmr_bytes2xact( rmr_mbuf_t* mbuf, unsigned char const* src, int len ); extern void rmr_free_msg( rmr_mbuf_t* mbuf ); extern unsigned char* rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest ); +extern unsigned char* rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest ); extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* mbuf, int new_tr_size ); extern int rmr_str2meid( rmr_mbuf_t* mbuf, unsigned char const* str ); extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str ); diff --git a/src/common/include/rmr_agnostic.h b/src/common/include/rmr_agnostic.h index c28ccb9..c25a535 100644 --- a/src/common/include/rmr_agnostic.h +++ b/src/common/include/rmr_agnostic.h @@ -72,7 +72,10 @@ typedef struct uta_ctx uta_ctx_t; //#define DEF_RTG_MSGID "" // default to pick up all messages from rtg #define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on #define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications -#define DEF_TR_LEN -1 // use default trace data len from context +#define DEF_TR_LEN (-1) // use default trace data len from context + +#define UNSET_SUBID (-1) // initial value on msg allocation indicating not set +#define UNSET_MSGTYPE (-1) // -- header length/offset macros must ensure network conversion ---- #define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct @@ -134,7 +137,7 @@ typedef struct { int32_t len1; // length of the tracing data int32_t len2; // length of data 1 (d1) int32_t len3; // length of data 2 (d2) - + int32_t sub_id; // subscription id (-1 invalid) } uta_mhdr_t; diff --git a/src/common/src/mbuf_api.c b/src/common/src/mbuf_api.c index ef0c28f..4b6d218 100644 --- a/src/common/src/mbuf_api.c +++ b/src/common/src/mbuf_api.c @@ -316,3 +316,28 @@ extern int rmr_get_trlen( rmr_mbuf_t* msg ) { return RMR_TR_LEN( hdr ); } + +/* + Returns the string in the source portion of the header. This is assumed to be + something that can be used for direct sends (hostname:port). Regardless, it + will be a nil terminated, ascii string with max of 64 characters including + the final nil. So, the user must ensure that dest is at least 64 bytes. + + As a convenience, the pointer to dest is returned on success; nil on failure + with errno set. +*/ +extern unsigned char* rmr_get_src( rmr_mbuf_t* msg, unsigned char* dest ) { + uta_mhdr_t* hdr = NULL; + + if( msg == NULL ) { + errno = EINVAL; + return NULL; + } + + if( dest != NULL ) { + hdr = msg->header; + strcpy( dest, hdr->src ); + } + + return dest; +} diff --git a/src/nanomsg/src/sr_static.c b/src/nanomsg/src/sr_static.c index 0f59da2..1c2bec8 100644 --- a/src/nanomsg/src/sr_static.c +++ b/src/nanomsg/src/sr_static.c @@ -100,6 +100,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s hdr = (uta_mhdr_t *) msg->header; hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version + hdr->sub_id = htonl( UNSET_SUBID ); SET_HDR_LEN( hdr ); SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas @@ -145,6 +146,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload @@ -208,6 +210,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { // --- these are all version agnostic ----------------------------------- nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload @@ -249,6 +252,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->len = msg->state - RMR_HDR_LEN( hdr ); } msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order + msg->sub_id = ntohl( hdr->sub_id ); // capture and convert from network order to local order msg->state = RMR_OK; msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src msg->payload = PAYLOAD_ADDR( msg->header ); @@ -284,7 +288,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length if( msg->state >= 0 ) { msg->xaction = NULL; - msg->mtype = -1; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; msg->len = msg->state; // no header; len is the entire thing received msg->state = RMR_OK; msg->flags = MFL_RAW; // prevent any sending of this headerless buffer @@ -295,7 +300,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->state = RMR_ERR_EMPTY; msg->payload = NULL; msg->xaction = NULL; - msg->mtype = -1; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; } return msg; @@ -318,7 +324,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) { // future: ensure that application did not overrun the XID buffer; last byte must be 0 hdr = (uta_mhdr_t *) msg->header; - hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport + hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport + hdr->sub_id = htonl( msg->sub_id ); hdr->plen = htonl( msg->len ); if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source diff --git a/src/nng/src/sr_nng_static.c b/src/nng/src/sr_nng_static.c index 3592a7b..c7bd049 100644 --- a/src/nng/src/sr_nng_static.c +++ b/src/nng/src/sr_nng_static.c @@ -139,6 +139,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) { hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version + hdr->sub_id = htonl( UNSET_SUBID ); SET_HDR_LEN( hdr ); // ensure these are converted to net byte order SET_HDR_TR_LEN( hdr, ctx->trace_data_len ); //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them @@ -225,6 +226,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) { msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order + msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this msg->state = RMR_OK; hlen = sizeof( uta_v1mhdr_t ); break; @@ -238,6 +240,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) { msg->xaction = &hdr->xid[0]; // point at transaction id in header area msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order + msg->sub_id = ntohl( hdr->sub_id ); hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later break; } @@ -290,6 +293,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { // --- these are all version agnostic ----------------------------------- nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload @@ -362,6 +366,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { // --- these are all version agnostic ----------------------------------- nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload @@ -444,7 +449,8 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->payload = NULL; msg->xaction = NULL; msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message - msg->mtype = -1; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; } return msg; @@ -480,7 +486,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->header = nng_msg_body( msg->tp_buf ); msg->len = rsize; // len is the number of bytes received msg->alloc_len = rsize; - msg->mtype = -1; // raw message has no type + msg->mtype = UNSET_MSGTYPE; // raw message has no type + msg->sub_id = UNSET_SUBID; // nor a subscription id msg->state = RMR_OK; msg->flags = MFL_RAW; msg->payload = msg->header; // payload is the whole thing; no header @@ -511,7 +518,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock // future: ensure that application did not overrun the XID buffer; last byte must be 0 hdr = (uta_mhdr_t *) msg->header; - hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport + hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport + hdr->sub_id = htonl( msg->sub_id ); hdr->plen = htonl( msg->len ); tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send diff --git a/test/mbuf_api_static_test.c b/test/mbuf_api_static_test.c index 8598136..f232b3e 100644 --- a/test/mbuf_api_static_test.c +++ b/test/mbuf_api_static_test.c @@ -1,7 +1,7 @@ // : vi ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,7 +21,7 @@ /* Mmemonic: mbuf_api_static_test.c Abstract: Test the message buffer funcitons. These are meant to be included at compile - time by the test driver. + time by the test driver. Author: E. Scott Daniels Date: 3 April 2019 @@ -44,6 +44,7 @@ int mbuf_api_test( ) { int i; int state; int errors = 0; + char* buf; rmr_mbuf_t* mbuf; unsigned char src_buf[256]; @@ -131,7 +132,7 @@ int mbuf_api_test( ) { errno = 0; c = rmr_get_meid( NULL, NULL ); errors += fail_if( c != NULL, "get meid with nil message buffer" ); - errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" ); + errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" ); c = rmr_get_meid( mbuf, NULL ); // should allocate and return c errors += fail_if( c == NULL, "get meid with nil dest pointer (did not allocate a buffer)" ); @@ -248,6 +249,20 @@ int mbuf_api_test( ) { errors+= fail_not_equal( state, 0, "compare of pulled trace info did not match" ); i = rmr_get_trlen( mbuf ); + + + // ------------- source field tests ------------------------------------------------------------ + // we cannot force anything into the message source field, so no way to test the content, but we + // can test pointers and expected nils + + buf = rmr_get_src( NULL, src_buf ); // coverage test for nil msg check + errors += fail_not_nil( buf, "rmr_get_src returned a pointer when given a nil message" ); + + buf = rmr_get_src( mbuf, NULL ); // coverage test for nil dst check + errors += fail_not_nil( buf, "rmr_get_src returned a pointer when given a nil dest buffer" ); + + buf = rmr_get_src( mbuf, src_buf ); + errors += fail_not_equal( buf, src_buf, "rmr_get_src didn't return expexted buffer pointer" ); return errors > 0; // overall exit code bad if errors -- 2.16.6