Correct inability to extend payload for rts msg 94/1294/4 r2-temp 1.10.2
authorE. Scott Daniels <daniels@research.att.com>
Thu, 31 Oct 2019 13:20:33 +0000 (09:20 -0400)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 31 Oct 2019 15:04:32 +0000 (11:04 -0400)
This change allows the application to increase the payload size
of an existing message buffer. This is needed when and application
must use the return to sender (rts) function to send a payload which
is larger than the payload in the received message.

Changes include a new RMR function: rmr_realloc_payload() and
changes to the python wrapper which will resize the payload if
the python application attempts to populate a message buffer which
does not have sufficent size.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ia55c0ff1d5ab3ce73292c00264df7bc16e229d3a

31 files changed:
CHANGES
CMakeLists.txt
doc/CMakeLists.txt
doc/src/man/rmr.7.xfm
doc/src/man/rmr_realloc_payload.3.xfm [new file with mode: 0644]
doc/src/man/rmr_rts_msg.3.xfm
src/rmr/common/include/rmr.h
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/mbuf_api.c
src/rmr/nng/include/rmr_nng_private.h
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/sr_nng_static.c
test/app_test/Makefile
test/app_test/ex_rts_receiver.c [new file with mode: 0644]
test/app_test/rebuild.ksh
test/app_test/run_all.ksh
test/app_test/run_app_test.ksh
test/app_test/run_call_test.ksh
test/app_test/run_exrts_test.ksh [new file with mode: 0644]
test/app_test/run_lcall_test.ksh
test/app_test/run_multi_test.ksh
test/app_test/run_rr_test.ksh
test/app_test/run_rts_test.ksh
test/app_test/test_support.c [new file with mode: 0644]
test/app_test/v_sender.c [new file with mode: 0644]
test/mbuf_api_static_test.c
test/rmr_nng_api_static_test.c
test/sr_nng_static_test.c
test/test_nng_em.c
test/test_support.c
test/unit_test.ksh

diff --git a/CHANGES b/CHANGES
index aead0f9..d3b1588 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,10 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2019 October 31; version 1.10.2
+       Provide the means to increase the payload size of a received message
+       without losing the data needed to use the rmr_rts_msg() funciton.
+
 2019 October 21; version 1.10.1
        Fix to prevent null message buffer from being returned by the timeout
        receive function if the function is passed one to reuse.
index f38565e..da7c914 100644 (file)
@@ -36,7 +36,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "10" )
-set( patch_level "1" )
+set( patch_level "2" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 77cea9b..55c5098 100644 (file)
@@ -83,6 +83,7 @@ if( BUILD_DOC )
                rmr_mt_call.3
                rmr_mt_rcv.3
                rmr_get_srcip.3
+               rmr_realloc_payload.3
                rmr_trace_ref.3
                rmr_set_stimeout.3
                rmr_get_xact.3
index 1d87f73..26738e1 100644 (file)
@@ -49,7 +49,7 @@ applications.
 &space
 RMr functions do provide for the ability to respond to the specific source
 instance of a message allowing for either a request response, or call
-response relationship when needed. 
+response relationship when needed.
 
 
 &h3(The Route Table)
@@ -69,7 +69,7 @@ by RMr and used for mapping messages to end points.
 &h3(Environment)
 To enable configuration of the library behaviour outside of direct user application
 control, RMr supports a number of environment variables which provide information
-to the library. 
+to the library.
 The following is a list of the various environment variables, what they control
 and the defaults which RMr uses if undefined.
 
@@ -78,7 +78,7 @@ and the defaults which RMr uses if undefined.
        value to 0. When set to 1, or missing from the environment, RMR will invoke the
        connection interface in the transport mechanism using the non-blocking (asynch)
        mode.  This will likely result in many "soft failures" (retry) until the connection
-       is established, but allows the application to continue unimpeeded should the 
+       is established, but allows the application to continue unimpeeded should the
        connection be slow to set up.
 
 &di(RMR_BIND_IF) This provides the interface that RMr will bind listen ports to allowing
@@ -97,7 +97,7 @@ and the defaults which RMr uses if undefined.
 
 &di(RMR_SEED_RT) This is used to supply a static route table which can be used for
        debugging, testing, or if no route table generator process is being used to
-       supply the route table. 
+       supply the route table.
        If not defined, no static table is used and RMr will not report &ital(ready)
        until a table is received.
 &end_dlist
@@ -126,6 +126,7 @@ rmr_fib(3),
 rmr_has_str(3),
 rmr_tokenise(3),
 rmr_mk_ring(3),
+rmr_realloc_payload(3),
 rmr_ring_free(3),
 rmr_set_trace(3),
 rmr_torcv_msg(3),
diff --git a/doc/src/man/rmr_realloc_payload.3.xfm b/doc/src/man/rmr_realloc_payload.3.xfm
new file mode 100644 (file)
index 0000000..193d549
--- /dev/null
@@ -0,0 +1,139 @@
+.if false
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+.fi
+
+
+.if false
+       Mnemonic        rmr_realloc_payload.3.xfm
+       Abstract        The manual page for the rmr_realloc_payload function.
+       Author          E. Scott Daniels
+       Date            30 October 2019
+.fi
+
+.gv e LIB lib
+.im &{lib}/man/setup.im
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+       rmr_realloc_payload
+
+&h2(SYNOPSIS )
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* msg, int new_len, int copy, int clone );
+&ex_end
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_realloc_payload) function will return a pointer to an RMR message
+buffer struct (rmr_mbuf_t) which has a payload large enough to accomodate &ital(new_len)
+bytes.
+If necessary, the underlying payload is reallocated, and the bytes from the original
+payload are copied if the &ital(copy) parameter is true (1).
+If the message passed in has a payload large enough, there is no additional
+memory allocation and copying.
+
+&h3(Cloning The Message Buffer)
+This function can also be used to generate a separate copy of the original message,
+with the desired payload size, without destroying the original message buffer or the
+original payload.
+A standalone copy is made only when the &ital(clone) parameter is true (1).
+When cloning, the payload is copied to the cloned message &bold(only) if the &ital(copy)
+parameter is true.
+
+&h3(Message Buffer Metadata)
+The metadata in the original message buffer (message type, subscription ID, and payload
+length) will be preserved if the &ital(copy) parameter is true.
+When this parameter is not true (0), then these values are set to the
+uninitialised value (-1) for type and ID, and the length is set to 0.
+
+&h2(RETURN VALUE)
+The &cw(rmr_realloc_payload) function returns a pointer to the message buffer with the
+payload which is large enough to hold &ital(new_len) bytes.
+If the &ital(clone) option is true, this will be a pointer to the newly cloned
+message buffer; the original message buffer pointer may still be used to referenced
+that message.
+It is the calling application's responsibility to free the memory associateed with
+both messages using the rmr_free_msg() function.
+&space
+
+When the &ital(clone) option is not used, it is still good practice by the calling
+application to capture and use this reference as it is possible that the message
+buffer, and not just the payload buffer, was reallocated.
+
+In the event of an error, a nil pointer will be returned and the value of &ital(errno)
+will be set to reflect the problem.
+
+&h2(ERRORS)
+These value of &ital(errno) will reflect the error condition if a nil pointer is returned:
+
+&half_space
+&beg_dlist(.75i : ^&bold_font )
+&di(ENOMEM) Memory allocation of the new payload failed.
+&half_space
+
+&di(EINVAL) The pointer passed in was nil, or refrenced an invalid message, or the required
+                       length was not valid.
+&end_dlist
+
+
+&h2(EXAMPLE)
+The following code bit illustrates how this function can be used to
+reallocate a buffer for a return to sender acknowledgement message which
+is larger than the message received.
+
+&space
+&ex_start
+  if( rmr_payload_size( msg ) < ack_sz ) {              // received message too small for ack
+    msg = rmr_realloc_payload( msg, ack_sz, 0, 0 );     // reallocate the message with a payload big enough
+    if( msg == NULL ) {
+      fprintf( stderr, "[ERR] realloc returned a nil pointer: %s\n", strerror( errno ) );
+    } else {
+               // populate and send ack message
+       }
+}
+
+&ex_end
+
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_free_msg(3),
+rmr_init(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_rcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_fib(3),
+rmr_has_str(3),
+rmr_set_stimeout(3),
+rmr_tokenise(3),
+rmr_mk_ring(3),
+rmr_ring_free(3)
+.ju on
+
+
+.qu
+
index a5251d4..015c03c 100644 (file)
@@ -1,6 +1,6 @@
 .if false
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,7 +24,7 @@
 .fi
 
 .gv e LIB lib
-.im &{lib}/man/setup.im 
+.im &{lib}/man/setup.im
 
 &line_len(6i)
 
@@ -43,13 +43,26 @@ rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg );
 
 &h2(DESCRIPTION)
 The &cw(rmr_rts_msg) function sends a message returning it to the endpoint
-which sent the message rather than selecting an endpoint based on the 
-message type and routing table. 
+which sent the message rather than selecting an endpoint based on the
+message type and routing table.
 Other than this small difference, the behaviour is exactly the same as
 &cw(rmr_send_msg.)
 
 .** pull in common retry text
-.im &{lib}/man/retry.im 
+.im &{lib}/man/retry.im
+
+&h2(PAYLOAD SIZE)
+When crafting a response based on a received message, the user application must
+take care not to write more bytes to the message payload than the allocated message
+has.
+In the case of a received message, it is possible that the response needs to be
+larger than the payload associated with the inbound message.
+In order to use the return to sender function, the source infomration in the orignal
+message must be present in the response; information which cannot be added to a
+message buffer allocated through the standard RMR allocation function.
+To allocate a buffer with a larger payload, and which retains the necessary sender
+data needed by this function, the &ital(rmr_realloc_payload()) function must be
+used to extend the payload to a size suitable for the response.
 
 &h2(RETURN VALUE)
 On success, a new message buffer, with an empty payload, is returned for the application
@@ -58,18 +71,18 @@ The state in this buffer will reflect the overall send operation state and shoul
 &cw(RMR_OK.)
 
 &space
-If the state in the returned buffer is anything other than &cw(UT_OK,) the user application 
+If the state in the returned buffer is anything other than &cw(UT_OK,) the user application
 may need to attempt a retransmission of the message, or take other action depending on the
-setting of &cw(errno) as described below. 
+setting of &cw(errno) as described below.
 
 &space
-In the event of extreme failure, a NULL pointer is returned. In this case the value of 
-&cw(errno) might be of some use, for documentation, but there will be little that the 
+In the event of extreme failure, a NULL pointer is returned. In this case the value of
+&cw(errno) might be of some use, for documentation, but there will be little that the
 user application can do other than to move on.
 
 &h2(ERRORS)
 The following values may be passed back in the &ital(state) field of the returned message
-buffer. 
+buffer.
 
 &space
 &beg_dlist(.75i : ^&bold_font )
index d43d3ba..a83ed90 100644 (file)
@@ -144,6 +144,7 @@ extern unsigned char*  rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest );
 extern unsigned char* rmr_get_srcip( rmr_mbuf_t* msg, unsigned char* dest );
 extern unsigned char*  rmr_get_xact( rmr_mbuf_t* mbuf, unsigned char* dest );
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* mbuf, int new_tr_size );
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
 extern int rmr_str2meid( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
index 8694574..a20e2e7 100644 (file)
@@ -146,7 +146,7 @@ typedef struct uta_ctx  uta_ctx_t;
 */
 typedef struct {
        int32_t mtype;                                          // message type  ("long" network integer)
-       int32_t plen;                                           // payload length
+       int32_t plen;                                           // payload length (sender data length in payload)
        int32_t rmr_ver;                                        // our internal message version number
        unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
        unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
index e88ab23..f730958 100644 (file)
@@ -22,7 +22,9 @@
        Mnemonic:       mbuf_api.c
        Abstract:       These are common functions which work only on the mbuf and
                                thus (because they do not touch an endpoint or context)
-                               can be agnostic to the underlying transport.
+                               can be agnostic to the underlying transport, or the transport
+                               layer provides a transport specific function (e.g. payload 
+                               reallocation).
 
        Author:         E. Scott Daniels
        Date:           8 February 2019
@@ -447,3 +449,4 @@ extern unsigned char* rmr_get_srcip( rmr_mbuf_t* msg, unsigned char* dest ) {
 
        return rstr;
 }
+
index 9753931..bf59574 100644 (file)
@@ -124,6 +124,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  );
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
+static rmr_mbuf_t* realloc_payload( rmr_mbuf_t* mbuf, int new_len, int copy, int clone );
 
 
 
index d9c4c06..7eafc81 100644 (file)
@@ -1086,3 +1086,31 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
 
        return mbuf;
 }
+
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
index 3435cb3..3146fa2 100644 (file)
@@ -296,7 +296,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
                default:                                                                                        // current message always caught  here
                        hdr = nm->header;
-                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
                        nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
                        break;
        }
@@ -306,6 +306,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
+       if( DEBUG ) fprintf( stderr, "[DBUG] clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
 
        nm->xaction = hdr->xid;                                                                 // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
@@ -384,6 +385,113 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        return nm;
 }
 
+/*
+       Realloc the message such that the payload is at least payload_len bytes.  If the current
+       payload size is large enough, no action is taken. If copy is false, the actual payload
+       bytes are NOT copied.  This allows a caller to realloc for a response message (to retain
+       the source information which would be lost on a simple alloc) which has no need for the
+       original message.
+
+       The old message buffer will reference the new underlying transport, and the original payload
+       will be lost unless clone is set to true. If clone is true, the old message buffer will continue
+       to reference the original payload, and a new message buffer will be allocated (even if the
+       payload size in the old message was larger than requested).
+
+       The return value is a pointer to the message with at least payload_len bytes allocated. It 
+       will be the same as the old_message if clone is false.
+
+       CAUTION:
+       If the message is not a message which was received, the mtype, sub-id, length values in the
+       RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
+       mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
+       resetting information in the mbuffer, this funciton will reset the mbuf values from the current
+       settings and NOT from the copied RMR header in transport buffer.
+*/
+static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
+       rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* omhdr;              // old message header
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+       int old_psize = 0;              // current size of message for payload
+       int     hdr_len = 0;            // length of RMR header in old msg
+       void*   old_tp_buf;             // pointer to the old tp buffer
+       int     free_tp = 1;            // free the transport buffer (old) when done (when not cloning)
+       int             old_mt;                 // msg type and sub-id from the message passed in
+       int             old_sid;
+       int             old_len;
+
+       if( old_msg == NULL || payload_len <= 0 ) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       old_mt = old_msg->mtype;
+       old_sid = old_msg->sub_id;
+       old_len = old_msg->len;
+       old_psize = old_msg->alloc_len - RMR_HDR_LEN( old_msg->header );                                // allocated transport size less the header and other data bits
+       if( !clone  && payload_len <= old_psize ) {                                                             // old message is large enough, nothing to do
+               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
+               return old_msg;
+       }
+
+       hdr_len = RMR_HDR_LEN( old_msg->header );
+       old_tp_buf = old_msg->tp_buf;
+
+       if( clone ) {
+               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: cloning message\n" );
+               free_tp = 0;
+
+               nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
+               if( nm == NULL ) {
+                       fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
+                       return NULL;
+               }
+               memset( nm, 0, sizeof( *nm ) );
+       } else {
+               nm = old_msg;
+       }
+
+       omhdr = old_msg->header;
+       mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
+
+       if( DEBUG ) fprintf( stderr, "[DBUG] reallocate for payload increase. new message size: %d\n", (int) mlen );    
+       if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
+               fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+               return NULL;
+       }
+
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       SET_HDR_LEN( nm->header );
+
+       if( copy ) {                                                                                                                            // if we need to copy the old payload too
+               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+               memcpy( nm->header, omhdr, sizeof( char ) * old_psize );
+       } else {                                                                                                                                        // just need to copy header
+               if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
+               memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( nm->header ) );
+       }
+
+       ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
+
+       if( !copy ) {
+               nm->mtype = -1;                                         // didn't copy payload, so mtype and sub-id are invalid
+               nm->sub_id = -1;
+               nm->len = 0;                                            // and len is 0
+       } else {
+               nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
+               nm->mtype = old_mt;
+               nm->sub_id = old_sid;
+       }
+
+       if( free_tp ) {
+               free( old_tp_buf );                             // we did not clone, so free b/c no references
+       }
+
+       return nm;
+}
+
 /*
        This is the receive work horse used by the outer layer receive functions.
        It waits for a message to be received on our listen socket. If old msg
index e893aae..019f3e2 100644 (file)
@@ -1,4 +1,5 @@
-#==================================================================================
+#
+#=================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
 #
@@ -32,6 +33,8 @@
 #.SHELLFLAGS = -e      # hosed on some flavours so keep it off
 SHELL ?= /bin/ksh
 
+ex_cflags = $(shell echo $$EX_CFLAGS )
+
 build_path ?= ../../.build
 header_path := $(shell find $(build_path) -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' )
 
@@ -44,7 +47,7 @@ LIBRARY_PATH = $(LD_LIBRARY_PATH)
 
 
 .PHONY: all
-all: sender receiver caller mt_receiver
+all: sender receiver caller mt_receiver v_sender ex_rts_receiver
 
 receiver: receiver.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
@@ -55,6 +58,12 @@ mt_receiver: receiver.c
 lreceiver: lreceiver.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+ex_rts_receiver: ex_rts_receiver.c
+       gcc $(ex_cflags) -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
+v_sender: v_sender.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
 sender: sender.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
@@ -65,8 +74,6 @@ lcaller: lcaller.c
        gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@  -lrmr_nng -lnng -lpthread -lm
 
 lsender: lsender.c
-       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@  -lrmr_nng -lnng -lpthread -lm
-
 
 # clean removes intermediates; nuke removes everything that can be built
 .PHONY: clean nuke
diff --git a/test/app_test/ex_rts_receiver.c b/test/app_test/ex_rts_receiver.c
new file mode 100644 (file)
index 0000000..14ae26c
--- /dev/null
@@ -0,0 +1,209 @@
+// vim: ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       ex_rts_receiver.c
+       Abstract:       This receiver is a bit more specitalised with respect to 
+                               expclicitly testing the abilty to expand a message payload
+                               length when it is necessary for an appliction to send a response
+                               to a message originator where the rts payload is larger than the
+                               received message.
+
+                               This specific test is accomplished by responding to all messages
+                               with a response which is 1024 bytes in length. This means that
+                               any message received which is smaller than 1K bytes will have to
+                               be expanded.  Further, all 1024 bytes will be generated into the
+                               response, with a checksum, and the expectation is that the message
+                               originator (assumed to be the v_sender) will verify that the 
+                               message size is as expected, and that the checksum matches.
+
+                               This test is concerned only with functionality, and not latency
+                               or speed, and as such there as not been any attempt to make the 
+                               building of responses etc. efficent.
+                               
+                               Compile time options:
+                                               DEBUG: write extra output about bad messages etc.
+                                               MTC:    enable the multi-thraded call support in RMR
+
+       Date:           28 October 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+
+#include <rmr/rmr.h>
+
+#define HDR_SIZE 64                                                    // the size of the header we place into the message
+#define MSG_SIZE 1024                                          // the size of the message that will be sent (hdr+payload)
+#define DATA_SIZE (MSG_SIZE-HDR_SIZE)          // the actual 'data' length returned in the ack msg
+
+
+#ifndef DEBUG
+#define DEBUG 0
+#endif
+
+#include "test_support.c"                                      // checksum, header gen, etc.
+
+int main( int argc, char** argv ) {
+       void* mrc;                                              // msg router context
+       rmr_mbuf_t* msg = NULL;                         // message received
+       int     i;
+       int             j;
+       int             state;
+       int             errors = 0;
+       char*   listen_port = "4560";
+       long    count = 0;                                      // total received
+       long    good = 0;                                       // good palyload buffers
+       long    bad = 0;                                        // payload buffers which were not correct
+       long    bad_sid = 0;                            // bad subscription ids
+       long    resized = 0;                            // number of messages we had to resize before replying
+       long    timeout = 0;
+       long    rpt_timeout = 0;                        // next stats message
+       char*   data;
+       int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
+       int             rt_count = 0;                           // retry count
+       long    ack_count = 0;                          // number of acks sent
+       int             count_bins[11];                         // histogram bins based on msg type (0-10)
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
+       char    ack_header[64];                         // we'll put checksum and maybe other stuff in the header of the response for validation
+       char    ack_data[DATA_SIZE];            // data randomly generated for each response
+       int             need;                                           // amount of something that we need
+       int             sv;                                                     // checksum valu
+
+       data = getenv( "RMR_RTG_SVC" );
+       if( data == NULL ) {
+               setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
+       }
+
+       if( argc > 1 ) {
+               listen_port = argv[1];
+       }
+
+       memset( count_bins, 0, sizeof( count_bins ) );
+
+       fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+
+#ifdef MTC
+       fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+#else
+       fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+#endif
+       if( mrc == NULL ) {
+               fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       timeout = time( NULL ) + 20;
+       while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
+               fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<RCVR> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+
+       timeout = time( NULL ) + 20;
+       while( 1 ) {
+               msg = rmr_torcv_msg( mrc, msg, 1000 );                          // break about every 1s so that if sender never starts we eventually escape
+
+               if( msg ) {
+                       if( msg->state == RMR_OK ) {
+                               if( validate_msg( msg->payload, msg->len ) ) {          // defrock the header, then verify lengths and chksum
+                                       good++;
+                               } else {
+                                       bad++;
+                               }
+                               count++;                                                                                // total messages received for stats output
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+
+                               need = generate_payload( ack_header, ack_data, 0, 0 );          // create an ack w/ random payload in payload, and set data in header
+                               if( rmr_payload_size( msg ) < need ) {                                  // received message too small
+                                       resized++;
+                                       msg = rmr_realloc_payload( msg, need, 0, 0 );           // reallocate the message with a payload big enough
+                                       if( msg == NULL ) {
+                                               fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
+                                               continue;
+                                       }
+                               }
+
+                               fill_payload( msg, ack_header, 0, ack_data, 0 );                // push headers (with default lengths) into message
+                               msg->mtype = 99;
+                               msg->sub_id = -1;
+                               msg->len = need;
+
+                               msg = rmr_rts_msg( mrc, msg );                                                  // return our ack message
+                               rt_count = 1000;
+                               while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
+                                       if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
+                                               sleep( 1 );
+                                       }
+                                       rt_count--;
+                                       msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
+                               }
+                               if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
+                                       ack_count++;
+                               }
+
+                               timeout = time( NULL ) +20;
+                       }
+               }
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<RCVR> stopping, no recent messages received\n" );
+                       break;
+               } else {
+                       if( time( NULL ) > rpt_timeout ) {
+                               fprintf( stderr, "<RCVR> %ld msgs=%ld good=%ld  acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
+
+                               rpt_timeout = time( NULL ) + 5;
+                       }
+               }
+       }
+
+       wbuf[0] = 0;
+       for( i = 0; i < 11; i++ ) {
+               snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+               strcat( wbuf, sbuf );
+       }
+
+       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  resized=%ld bad-sub_id=%ld\n", 
+               !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized,  bad_sid );
+
+       sleep( 2 );                                                                     // let any outbound acks flow before closing
+
+       rmr_close( mrc );
+       return !!(errors + bad);                        // bad rc if any are !0
+}
+
index 5a750cb..1cae81b 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
index 990694d..d779d65 100644 (file)
@@ -1,12 +1,49 @@
+#!/usr/bin/env ksh
+# vim: ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
 
 # run all of the tests, building rmr before the first one if -B is on the command line.
 
+function run_test {
+       if [[ -n $capture_file ]]
+       then
+               if ! ksh $@ >>$capture_file 2>&1
+               then
+                       echo "[FAIL] test failed; see $capture_file"
+                       (( errors++ ))
+               fi
+       else
+               if ! ksh $@
+               then
+                       (( errors++ ))
+               fi
+       fi
+}
+
 build=""
+errors=0
 
 while [[ $1 == "-"* ]]
 do
-       case $1 in 
+       case $1 in
                -B)     build="-B";;
+               -e)     capture_file=$2; >$capture_file; shift;;
                -i)     installed="-i";;
 
                *)      echo "'$1' is not a recognised option and is ignored";;
@@ -15,12 +52,26 @@ do
        shift
 done
 
-set -e
-echo "---- app -------------"
-ksh run_app_test.ksh -v $installed $build
-echo "----- multi -----------"
-ksh run_multi_test.ksh
-echo "----- round robin ----"
-ksh run_rr_test.ksh
-echo "----- rts ------------"
-ksh run_rts_test.ksh -s 20
+echo "----- app --------------------"
+run_test run_app_test.ksh -v $installed $build
+
+echo "----- multi ------------------"
+run_test run_multi_test.ksh
+
+echo "----- round robin -----------"
+run_test run_rr_test.ksh
+
+echo "----- rts -------------------"
+run_test run_rts_test.ksh -s 20
+
+echo "----- extended payload ------"
+run_test run_exrts_test.ksh -d 10 -n 1000
+
+if (( errors == 0 ))
+then
+       echo "[PASS] all test pass"
+else
+       echo "[FAIL] one or more application to application tests failed"
+fi
+
+exit $(( !! errors ))
index 3da098a..f83270a 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
index 40a58e6..6e03a4b 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
diff --git a/test/app_test/run_exrts_test.ksh b/test/app_test/run_exrts_test.ksh
new file mode 100644 (file)
index 0000000..08807fc
--- /dev/null
@@ -0,0 +1,225 @@
+#!/usr/bin/env ksh
+# vim: ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_exrts_test.ksh
+#      Abstract:       This is a simple script to set up and run the v_send/ex_rts_receive
+#                              processes for some library validation on top of nng. This test
+#                              starts these processes to verify that when a receiver has an ack 
+#                              message larger than the received message it is able to allocate
+#                              a new payload which can be returned via the RMR return to sender
+#                              function.
+#
+#                              Example command line:
+#                                      # run with 10 caller threads sending 10,000 meessages each, 
+#                                      # 5 receivers, and a 10 mu-s delay between each caller send
+#                                      ksh ./run_lcall_test.ksh -d 10 -n 10000 -r 5 -c 10
+#
+#      Date:           28 October 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       ./v_sender ${nmsg:-10} ${delay:-100000} ${mtype_start_stop:-0:1} 
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+       typeset port
+
+       port=$(( 4460 + ${1:-0} ))
+       export RMR_RTG_SVC=$(( 9990 + $1 ))
+       ./ex_rts_receiver $port
+       echo $? >/tmp/PID$$.$1.rrc
+}
+
+#      Drop a contrived route table in such that the sender sends each message to n
+#      receivers.
+#
+function set_rt {
+       typeset port=4460
+       typeset groups="localhost:4460"
+       for (( i=1; i < ${1:-3}; i++ ))
+       do
+               groups="$groups,localhost:$((port+i))"
+       done
+
+       cat <<endKat >ex_rts.rt
+               newrt | start
+               mse |0 | 0 | $groups
+               mse |1 | 10 | $groups
+               mse |2 | 20 | $groups
+               rte |3 | $groups
+               rte |4 | $groups
+               rte |5 | $groups
+               rte |6 | $groups
+               rte |7 | $groups
+               rte |8 | $groups
+               rte |9 | $groups
+               rte |10 | $groups
+               rte |11 | $groups
+               newrt | end
+endKat
+
+       cat ex_rts.rt
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]         # we need the real host name in the local.rt; build one from mask if not there
+then
+       hn=$(hostname)
+       sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+export EX_CFLAGS=""
+export RMR_ASYNC_CONN=0        # ensure we don't lose first msg as drops waiting for conn look like errors
+nmsg=100                                       # total number of messages to be exchanged (-n value changes)
+delay=500                                      # microsec sleep between msg 1,000,000 == 1s
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=1                                       # this is sane, but -r allows it to be set up
+use_installed=0
+mtype_start_stop=""
+
+while [[ $1 == -* ]]
+do
+       case $1 in
+               -c)     cthreads=$2; shift;;
+               -B)     rebuild=1;;
+               -d)     delay=${2//,/}; shift;;                         # delay in micro seconds allow 1,000 to make it easier on user
+               -i)     use_installed=1;;
+               -m)     mtype_start_stop="$2"; shift;;
+               -M)     mt_call="EX_CFLAGS=-DMTC=1";;                                   # turn on mt-call receiver option
+               -n)     nmsg=$2; shift;;
+               -r) nrcvrs=$2; shift;;
+               -v)     verbose=1;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-c caller-threads] [-d micor-sec-delay] [-i] [-M] [-m mtype] [-n num-msgs] [-r num-receivers] [-v]"
+                       echo "  -B forces a rebuild which will use .build"
+                       echo "  -i will use installed libraries (/usr/local) and cause -B to be ignored if supplied)"
+                       echo "  -m mtype  will set the stopping (max) message type; sender will loop through 0 through mtype-1"
+                       echo "  -m start:stop  will set the starting and stopping mtypes; start through stop -1"
+                       echo "  -M enables mt-call receive processing to test the RMR asynch receive pthread"
+                       echo ""
+                       echo "The receivers will run until they have not received a message for a few seconds. The"
+                       echo "sender will send the requested number of messages, 10 by default."
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+
+if (( verbose ))
+then
+       echo "2" >.verbose
+       export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( use_installed ))                 # point at installed library
+then
+       export LD_LIBRARY_PATH=/usr/local/lib
+       export LIBRARY_PATH=$LD_LIBRARY_PATH
+else
+       if (( rebuild ))
+       then
+               build_path=../../.build         # if we rebuild we can insist that it is in .build :)
+               set -e
+               ksh ./rebuild.ksh
+               set +e
+       else
+               build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+               if [[ ! -d $build_path ]]
+               then
+                       echo "cannot find build in: $build_path"
+                       echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+                       exit 1
+               fi
+       fi
+
+       if [[ -d $build_path/lib64 ]]
+       then
+               export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+       else
+               export LD_LIBRARY_PATH=$build_path:$build_path/lib
+       fi
+fi
+
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./ex_rts.rt
+
+set_rt $nrcvrs                                                                                                         # set up the rt for n receivers
+
+if ! make $mt_call -B v_sender ex_rts_receiver >/tmp/PID$$.log 2>&1                    # for sanity, always rebuild test binaries
+then
+       echo "[FAIL] cannot make binaries"
+       cat /tmp/PID$$.log
+       rm -f /tmp/PID$$*
+       exit 1
+fi
+
+echo "<RUN> binaries built"
+
+for (( i=0; i < nrcvrs; i++  ))
+do
+       run_rcvr $i &
+done
+
+sleep 2                                # let receivers init so we don't shoot at empty targets
+if (( verbose ))
+then
+       netstat -an|grep LISTEN
+fi
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ ))         # collect return codes
+do
+       head -1 /tmp/PID$$.$i.rrc | read x
+       (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+       rm -f ex_rts.rt
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+
index a1cafb0..b01aba2 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
index fa476ec..99a147b 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
index c1714a9..329d59f 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
index f6e16b4..4ec6afe 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env ksh
-# :vi ts=4 sw=4 noet :
+# vim: ts=4 sw=4 noet :
 #==================================================================================
 #    Copyright (c) 2019 Nokia
 #    Copyright (c) 2018-2019 AT&T Intellectual Property.
diff --git a/test/app_test/test_support.c b/test/app_test/test_support.c
new file mode 100644 (file)
index 0000000..2bd0a35
--- /dev/null
@@ -0,0 +1,315 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       test_support.c
+       Abstract:       Support functions for app test programmes.
+
+                                       sum() compute a simple checksum over a buffer
+                                       split() split an ascii buffer at the first |
+                                       generate_payload() build a header and data buffer generate
+                                               a random data buffer with a header containing
+                                               the checksum, and lengths.
+                                       validate_msg() given a message of <hdr><data> validate
+                                               that the message is the correct length and chk sum.
+
+       Date:           28 October 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <ctype.h>
+
+#ifndef _supp_tools_c
+#define _supp_tools_c
+
+#ifndef DEBUG
+#define DEBUG 0
+#endif
+
+// defaults if the test app doesn't set these
+#ifndef HDRSIZE
+#define HDR_SIZE 64                                                    // the size of the header we place into the message
+#endif 
+
+#ifndef MSG_SIZE
+#define MSG_SIZE 1024                                          // the size of the message that will be sent (hdr+payload)
+#endif
+
+#ifndef DATA_SIZE
+#define DATA_SIZE (HDR_SIZE-HDR_SIZE)          // the actual 'data' length returned in the ack msg
+#endif
+
+
+void spew( char* buf, int len ) {
+       int i;
+       char wbuf[1024];                                // slower, but buffer so that mult writers to the tty don't jumble (too much)
+       char bbuf[10];
+
+       wbuf[0] = 0;
+       for( i = 0; i < len; i++ ) {
+               if( i % 16 == 0 ) {
+                       fprintf( stderr, "%s\n", wbuf );
+                       wbuf[0] = 0;
+               }
+               sprintf( bbuf, "%02x ", (unsigned char) *(buf+i) );
+               strcat( wbuf, bbuf );
+       }
+
+       fprintf( stderr, "%s\n", wbuf );
+}
+
+/*
+       Parse n bytes and generate a very simplistic checksum. 
+       Returns the checksum.
+*/
+static int sum( char* bytes, int len ) {
+       int sum = 0;
+       int     i = 0;
+
+       while( i < len ) {
+               sum += *(bytes++) + i;
+               i++;
+       }
+
+       return sum % 255;
+}
+
+/*
+       Split the message at the first sep and return a pointer to the first
+       character after.
+*/
+static char* split( char* str, char sep ) {
+       char*   s;
+
+       s = strchr( str, sep );
+
+       if( s ) {
+               return s+1;
+       }
+
+       fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
+       return NULL;
+}
+
+/*
+       Generate all buffers that will be combined into a single message
+       payload. This is a random set of bytes into payload and then populate
+       the header with the payload length and chksum. The header generated is
+       placed into a fixed length buffer and consists of three values:
+               checksum result of payload
+               header size (the fixed size, though info is a string)
+               payload size
+
+       The message can be validated by computing the checksum of the
+       message after the HDR_SIZE, and comparing that with the value
+       passed.  The values in the header are all ASCII so they can
+       easily be printed on receipt as a debugging step if needed.
+
+       hs_over and ds_over allow the caller to override the header and data
+       size constants if set to a positive number
+
+       The return value is the size of the message payload needed to
+       hold both the header and the payload.
+*/
+int generate_payload( char* hdr, char* data, int hs_over, int ds_over ) {
+       int i;
+       long r;
+       int sv;
+       int     hsize;
+       int dsize;
+
+       hsize = hs_over <= 0 ? HDR_SIZE : hs_over;              // set values from overrides or defaults
+       dsize = ds_over <= 0 ? DATA_SIZE : ds_over;
+
+       memset( hdr, 0, sizeof( char ) * hsize );
+       for( i = 0; i < dsize; i++ ) {
+               r = random();
+               data[i] = r & 0xff;
+       }
+
+       sv = sum( data, dsize ); 
+       snprintf( hdr, sizeof( char ) * hsize, "%d %d %d |", sv, hsize, dsize );
+
+       return hsize + dsize;
+}
+
+/*
+       Generate a header for a given payload buffer. THe header size and data size
+       override values default to the HDR_SIZE and DATA_SIZE constants if a value
+       is <= 0; The data is checksummed and  the header buffer is populated
+       (see generate_payload() for detailed description.
+
+       The return is the number of bytes required for the full payload of both
+       header and data.
+*/
+int generate_header( char* hdr, char* data, int hs_over, int ds_over ) {
+       int sv;
+       int     hsize;
+       int dsize;
+       int     wrote;
+
+       hsize = hs_over <= 0 ? HDR_SIZE : hs_over;              // set values from overrides or defaults
+       dsize = ds_over <= 0 ? DATA_SIZE : ds_over;
+
+       memset( hdr, 0, sizeof( char ) * hsize );
+       sv = sum( data, dsize ); 
+       wrote = snprintf( hdr, sizeof( char ) * hsize, "%d %d %d |", sv, hsize, dsize );
+       if( wrote >= hsize ) {
+               fprintf( stderr, "<ERR>  header overflow\n" );
+               hdr[hsize-1] = 0;
+       }
+
+       return hsize + dsize;
+}
+
+/*
+       Convenience function to push a header/data into an RMR mbuf. It is assumed that the
+       mbuf is large enough; no checks are made. If either length is <= 0, then the 
+       default (constant value) is used.
+*/
+void fill_payload( rmr_mbuf_t* msg, char* hdr, int hdr_len, char *data, int data_len ) {
+       char*   payload;
+
+       if( msg == NULL ) {
+               fprintf( stderr, "<TEST> fill payload: msg passed in was nil\n" );
+               return;
+       }
+
+       hdr_len = hdr_len <= 0 ? HDR_SIZE : hdr_len;            // set values from input; else use defaults
+       data_len = data_len <= 0 ? DATA_SIZE : data_len;
+
+       if( hdr_len <= 0 || data_len <= 0 ) {
+               fprintf( stderr, "<TEST> fill payload: header or date len invalid: hdr=%d data=%d\n", hdr_len, data_len );
+               return;
+       }
+
+       payload = msg->payload;
+
+       memcpy( payload, hdr, hdr_len > 0 ? hdr_len : HDR_SIZE );
+       payload += hdr_len;
+       memcpy( payload, data, data_len > 0 ? data_len : DATA_SIZE );
+}
+
+/*
+       This function accepts an RMR message payload assumed to be header and user data,  in the 
+       format: <hdr><data> where <hdr> consists of a string with three space separated values:
+               checksum
+               hdr size
+               data size
+
+       Validation is:
+               ensure that the rmr_len matches the header size and data size which are parsed from
+               the message.  If sizes match, a checksum is performed on the data and the result
+               compared to the checksum value in the header.
+*/
+int validate_msg( char* buf, int rmr_len ) {
+       char*   tok;
+       char*   tok_mark = NULL;
+       char*   search_start;
+       char*   ohdr = NULL;    // original header as we trash the header parsing it
+       int             ex_sv;                  // expected checksum value (pulled from received header)
+       int             sv;                             // computed checksum value
+       int             ex_mlen = 0;    // expected msg_len (computed from the header, compared with rmr_len)
+       int             hdr_len;                // length of header to skip
+       int             data_len;               // length of data received
+       int             i;
+       int             good = 0;
+
+       if( buf == 0 && rmr_len <= 0 ) {
+               return 0;
+       }
+
+       for( i = 0; i < HDR_SIZE; i++ ) {               // for what we consider the dfault header, we must see ALL ascci and a trail zero before end
+               if( *(buf+i) == 0 ) {
+                       good = 1;
+                       break;
+               }
+
+               if( ! isprint( *buf ) ) {  // we expect the header to be a zero terminated string
+                       fprintf( stderr, "<TEST> validate msg: header is not completely ASCII i=%d\n", i );
+                       spew( buf, 64 );
+                       return 0;
+               }
+       }
+
+       if( ! good ) {
+               fprintf( stderr, "<TEST> validate msg: didn't find an acceptable header (not nil terminated)\n" );
+               return 0;
+       }
+
+       ohdr = strdup( buf );           // must copy so we can trash
+
+       search_start = buf;
+       for( i = 0; i < 3; i++ ) {
+               tok = strtok_r( search_start, " ", &tok_mark );
+               search_start = NULL;
+               if( tok ) {
+                       switch( i ) {
+                               case 0:
+                                       ex_sv = atoi( tok );
+                                       break;
+
+                               case 1:
+                                       hdr_len = atoi( tok );          // capture header length        
+                                       ex_mlen = hdr_len;                      // start to compute total msg len
+               
+                                       break;
+
+                               case 2:
+                                       data_len = atoi( tok );         // add data length      
+                                       ex_mlen += data_len;
+                                       break;
+                       }
+               }
+       }
+
+       if( ex_mlen != rmr_len ) {
+               fprintf( stderr, "[FAIL] received message length did not match hdr+data lengths, rmr_len=%d data follows:\n", rmr_len );
+               if( ! isprint( ohdr ) ) {
+                       fprintf( stderr, "[CONT] header isn't printable\n" );
+                       spew( ohdr, 64 );
+               } else {
+                       fprintf( stderr, "[CONT] header: (%s)\n", ohdr );
+                       fprintf( stderr, "[CONT] computed length: %d (expected)\n", ex_mlen );
+               }
+               free( ohdr );
+               return 0;
+       }
+
+       if( DEBUG ) {
+               fprintf( stderr, "[OK]  message lengths are good\n" );
+       }
+               
+       tok = buf + hdr_len;
+       //fprintf( stderr, ">>>> computing chksum starting at %d for %d bytes\n", hdr_len, data_len );
+       sv = sum( tok, data_len );                      // compute checksum of data portion
+
+       if( sv != ex_sv ) {
+               fprintf( stderr, "[FAIL] data checksum mismatch, got %d, expected %d. header: %s\n", sv, ex_sv, ohdr );
+               free( ohdr );
+               return 0;
+       }
+       
+       free( ohdr );
+       return 1;       
+}
+
+
+#endif
diff --git a/test/app_test/v_sender.c b/test/app_test/v_sender.c
new file mode 100644 (file)
index 0000000..3d53204
--- /dev/null
@@ -0,0 +1,275 @@
+// vim: ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       sender.c
+       Abstract:       This version of the sender will perform verification on response
+                               messages received back from the receiver.
+
+                               It is expected that the response messages are created with the 
+                               functions in the test_support module so that they can easily be
+                               vetted here.
+
+                               This sender is designed only to test the functionality of message
+                               routing, specifically the ability for a receiver to reallocate a
+                               message to handle a larger payload without losing the ability to
+                               use rmr_rts_msg(); no attempt has been made to be efficent, and
+                               this sender should not be used for performance tests.
+
+       Date:           28 October 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+
+#include <rmr/rmr.h>
+
+
+#define HDR_SIZE 64                                                    // the size of the header we place into the message
+#define MSG_SIZE 256                                           // toal message size sent via RMR (hdr+data)
+#define DATA_SIZE (MSG_SIZE-HDR_SIZE)          // the actual 'data' length returned in the ack msg
+
+#ifndef DEBUG
+#define DEBUG 0
+#endif
+
+#include "test_support.c"
+
+int main( int argc, char** argv ) {
+       void* mrc;                                                      // msg router context
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
+       int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
+       rmr_mbuf_t*             sbuf;                                   // send buffer
+       rmr_mbuf_t*             rbuf;                                   // received buffer
+       char*   ch;
+       int             count = 0;
+       int             short_count = 0;                                // number of acks that didn't seem to have a bigger payload
+       int             rt_count = 0;                                   // number of messages requiring a spin retry
+       int             rcvd_count = 0;
+       int             rts_ok = 0;                                             // number received with our tag
+       int             fail_count = 0;                                 // # of failure sends after first successful send
+       char*   listen_port = "43086";
+       int             mtype = 0;
+       int             stats_freq = 100;
+       int             successful = 0;                                 // set to true after we have a successful send
+       char    wbuf[DATA_SIZE];
+       char    me[128];                                                // who I am to vet rts was actually from me
+       char    trace[1024];
+       long    timeout = 0;
+       long    rep_timeout = 0;                                // report/stats timeout
+       int             delay = 100000;                                 // usec between send attempts
+       int             nmsgs = 10;                                             // number of messages to send
+       int             max_mt = 10;                                    // reset point for message type
+       int             start_mt = 0;
+       int             pass = 1;
+       int             need;
+
+       if( argc > 1 ) {
+               nmsgs = atoi( argv[1] );
+       }
+       if( argc > 2 ) {
+               delay = atoi( argv[2] );
+       }
+       if( argc > 3 ) {
+               if( (ch = strchr( argv[3], ':' )) != NULL ) {
+                       max_mt = atoi( ch+1 );
+                       start_mt = atoi( argv[3] );
+               } else {
+                       max_mt = atoi( argv[3] );
+               }
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
+       }
+
+       mtype = start_mt;
+
+       fprintf( stderr, "<VSNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+       if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
+               fprintf( stderr, "<VSNDR> unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "<VSNDR> unable to set up polling fd\n" );
+                       exit( 1 );
+               }
+               if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+                       fprintf( stderr, "<VSNDR> [FAIL] unable to create epoll fd: %d\n", errno );
+                       exit( 1 );
+               }
+               epe.events = EPOLLIN;
+               epe.data.fd = rcv_fd;
+
+               if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                       fprintf( stderr, "<VSNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+                       exit( 1 );
+               }
+       } else {
+               fprintf( stderr, "<VSNDR> abort: epoll not supported, can't listen for messages\n" );   
+       }
+
+       sbuf = rmr_alloc_msg( mrc, MSG_SIZE );                                          // alloc first send buffer; subsequent buffers allcoated on send
+       rbuf = NULL;                                                                                            // don't need to alloc receive buffer
+
+       timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
+       while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
+               fprintf( stderr, "<VSNDR> waiting for rmr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<VSNDR> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<VSNDR> rmr is ready; starting to send\n" );
+
+       gethostname( wbuf, sizeof( wbuf ) );
+       snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
+
+       while( count < nmsgs ) {                                                                                // we send n messages after the first message is successful
+               snprintf( wbuf, DATA_SIZE, "Don't shoot the messaging library if you don't like what was in the payload (%d)", rand() );        // add random to change chksum
+               need = generate_header( sbuf->payload, wbuf, 0, 0 );            // generate the header directly into the message payload
+               if( need > MSG_SIZE )                   {                                                       // we have a static size for sending; abort if it would be busted
+                       fprintf( stderr, "[CRIT] abort: need for send payload size is > than allocated: %d\n", need );
+                       exit( 1 );
+               }
+
+               memcpy( sbuf->payload + HDR_SIZE, wbuf, DATA_SIZE );            // copy in our data (probably not the full amount of bytes
+               sbuf->mtype = mtype;                                                                            // fill in the message metadata
+               if( mtype < 3 ) {
+                       sbuf->sub_id = mtype * 10;
+               } else {
+                       sbuf->sub_id = -1;
+               }
+
+               sbuf->len =  need;
+               sbuf->state = 0;
+               sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+               switch( sbuf->state ) {
+                       case RMR_ERR_RETRY:
+                               rt_count++;
+                               while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) {                       // soft failure (device busy?) retry
+                                       sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
+                               }
+                               if( sbuf->state == RMR_OK ) {
+                                       successful = 1;                                                         // indicates only that we sent one successful message, not the current state
+                               } else {
+                                       if( successful ) {
+                                               fail_count++;                                                   // count failures after first successful message
+                                       }
+                                       if( fail_count > 10 ) {
+                                               fprintf( stderr, "too many failures\n" );
+                                               exit( 1 );
+                                       }
+                               }
+                               break;
+
+                       case RMR_OK:
+                               successful = 1;
+                               break;
+
+                       default:
+                               if( successful ) {
+                                       fail_count++;                                                   // count failures after first successful message
+                               }
+                               // some error (not connected likely), don't count this
+                               //sleep( 1 );
+                               break;
+               }
+
+               if( successful ) {                              // once we have a message that was sent, start to increase things
+                       count++;
+                       mtype++;
+                       if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
+                               mtype = start_mt;
+                       }
+               }
+
+               if( rcv_fd >= 0 ) {
+                       while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
+                               if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
+                                       errno = 0;
+                                       rbuf = rmr_rcv_msg( mrc, rbuf );
+                                       if( rbuf && rbuf->state == RMR_OK ) {
+                                               if( rmr_payload_size( rbuf ) > HDR_SIZE+DATA_SIZE ) {           // verify that response has a larger payload than we should have sent
+                                                       rts_ok += validate_msg( rbuf->payload, rbuf->len );
+                                               } else { 
+                                                       short_count++;
+                                               }
+                                               rcvd_count++;
+                                       }
+                               }
+                       }
+               }
+
+               if( time( NULL ) > rep_timeout ) {
+                       fprintf( stderr, "<VSNDR> sent=%d  rcvd=%d  ok_acks=%d short_acks=%d send_fails=%d retries=%d\n", count, rcvd_count, rts_ok, short_count, fail_count, rt_count );
+
+                       rep_timeout = time( NULL ) + 5;
+               }
+
+               if( delay > 0 ) {
+                       usleep( delay );
+               }
+       }
+
+       timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
+       while( time( NULL ) < timeout ) {
+               if( rcv_fd >= 0 ) {
+                       while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {
+                               if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
+                                       errno = 0;
+                                       rbuf = rmr_rcv_msg( mrc, rbuf );
+                                       if( rbuf && rbuf->state == RMR_OK ) {
+                                               rcvd_count++;
+                                               if( rmr_payload_size( rbuf ) > HDR_SIZE+DATA_SIZE ) {           // verify that response has a larger payload than we should have sent
+                                                       rts_ok += validate_msg( rbuf->payload, rbuf->len );
+                                               }
+
+                                               timeout = time( NULL ) + 2;
+                                       }
+                               }
+                       }
+               }
+       }
+
+       if( rcvd_count != rts_ok || count != nmsgs ) {                  // we might not receive all back if receiver didn't retry, so that is NOT a failure here
+               pass = 0;
+       }
+
+       fprintf( stderr, "<VSNDR> [%s] sent=%d  rcvd=%d  rts-ok=%d failures=%d retries=%d\n", pass ? "PASS" : "FAIL",  count, rcvd_count, rts_ok, fail_count, rt_count );
+       rmr_close( mrc );
+
+       return !pass;
+}
+
index 936c43a..656287d 100644 (file)
@@ -40,6 +40,7 @@
 #include "rmr.h"
 #include "rmr_agnostic.h"
 
+// ---------------------------------------------------------------------------------------------
 
 int mbuf_api_test( ) {
        unsigned char* c;
@@ -49,6 +50,8 @@ int mbuf_api_test( ) {
        char*   buf;
        void*   ptr;
        rmr_mbuf_t*     mbuf;
+       rmr_mbuf_t*     mbuf2;
+       rmr_mbuf_t*     mbuf3;
        uta_mhdr_t*     hdr;
        unsigned char src_buf[256];
        unsigned char dest_buf[256];
@@ -68,7 +71,7 @@ int mbuf_api_test( ) {
        mbuf->header = mbuf->tp_buf;
        mbuf->alloc_len = 1024;
        mbuf->payload = PAYLOAD_ADDR( mbuf->header );
-       hdr = (rmr_mbuf_t *) mbuf->header;
+       hdr = (uta_mhdr_t *) mbuf->header;
        mbuf->xaction = hdr->xid;
 
 
@@ -346,5 +349,6 @@ int mbuf_api_test( ) {
        test_set_ver( mbuf, 2 );                                                        // set older message version to ensure properly handled
        buf = rmr_get_srcip( mbuf, src_buf );
 
+
        return errors > 0;                      // overall exit code bad if errors
 }
index f08c2f1..6588907 100644 (file)
@@ -274,6 +274,8 @@ static int rmr_api_test( ) {
        snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
        msg->mtype = 0;
        msg->sub_id = -1;
+       em_set_rcvcount( 0 );                                                   // reset message counter
+       em_set_rcvdelay( 1 );                                                   // force slow msg rate during mt testing
        msg = rmr_call( rmc, msg );                                             // dummy nng/nano function will sequentually add xactions and should match or '16'
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed "  );
        if( msg ) {
@@ -444,15 +446,15 @@ static int rmr_api_test( ) {
                errors += fail_if_nil( msg, "mt_call with known xaction id returned nil message" );
        }
        fprintf( stderr, "<INFO> time check: %ld\n", time( NULL ) );
-               
 
        em_set_mtc_msgs( 0 );                                                   // turn off 
-       em_set_rcvdelay( 0 );                                                   // full speed receive rate
-       ((uta_ctx_t *)rmc)->shutdown = 1;                               // force the mt-reciver attached to the context to stop
+       em_set_rcvdelay( 0 );                                                   // full speed receive rate to overflow the ring
 
-       em_set_rcvdelay( 0 );                                                   // let the receive loop spin w/o receives so we drive warning code about queue full
-       sleep( 5 );
+       fprintf( stderr, "<INFO> pausing 5s to allow mt-call receive ring to fill  %ld\n", time( NULL ) );
+       sleep( 2 );
+       fprintf( stderr, "<INFO> tests continuing %ld\n", time( NULL ) );
        em_set_rcvdelay( 1 );                                                   // restore slow receive pace for any later tests
+       ((uta_ctx_t *)rmc)->shutdown = 1;                               // force the mt-reciver attached to the context to stop
 #endif
 
 
index 7c44432..168289d 100644 (file)
 #include "rmr.h"
 #include "rmr_agnostic.h"
 
+// ----------- local test support ----------------------------------------------------------
+#define CLONE  1                       // convenience constants for payload realloc tests
+#define NO_CLONE 0
+#define COPY 1
+#define NO_COPY 0
+
 /*
        Generate a simple route table (for all but direct route table testing).
        This gets tricky inasmuch as we generate two in one; first a whole table 
@@ -113,7 +119,7 @@ static int sr_nng_test() {
        uta_ctx_t*      real_ctx;       // real one to force odd situations for error testing
        int errors = 0;                 // number errors found
        rmr_mbuf_t*     mbuf;           // mbuf to send/receive
-       rmr_mbuf_t*     mb2;            // error capturing msg buf
+       rmr_mbuf_t*     mb2;            // second mbuf when needed
        int             whid = -1;
        int             last_whid;
        int     state;
@@ -121,6 +127,7 @@ static int sr_nng_test() {
        int             size;
        int             i;
        void*   p;
+       char*   payload_str;
 
        //ctx = rmr_init( "tcp:4360", 2048, 0 );                                // do NOT call init -- that starts the rtc thread which isn't good here
        ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) );              // alloc the context manually
@@ -219,5 +226,113 @@ static int sr_nng_test() {
        rtc( NULL );                            // coverage test with nil pointer
        rtc( ctx );
 
+       setenv( "RMR_RTG_SVC", "4567", 1 );             // drive for edge case coverage to ensure no nil pointer etc
+       rtc( ctx );
+       setenv( "RMR_RTG_SVC", "tcp:4567", 1 );
+       rtc( ctx );
+       setenv( "RMR_RTG_SVC", "tcp:4567:error", 1 );
+       rtc( ctx );
+
+       // ------------- reallocation tests ------------------------------------------------------------
+       // we use mk_populated_msg() to create a message with mid/sid/plen pushed into the transport
+       // header to simulate a message having been sent and received which is what causes this data
+       // to push into the wire packet.
+
+       payload_str = "Stand Up and Cheer; OU78-82";
+
+
+       mbuf = mk_populated_msg( 1024, 0, 99, 100, strlen( payload_str ) );
+       memcpy( mbuf->payload, payload_str, mbuf->len );
+       mb2 = realloc_payload( mbuf, 512, NO_COPY, NO_CLONE );
+       errors += fail_if_nil( mb2, "realloc_payload (no copy) returned a nil pointer when reallocating with smaller size" );
+       errors += fail_if_false( mbuf == mb2, "non-clone realloc payload (no copy) of smaller size did not return the same buffer" );
+
+       mb2 = realloc_payload( NULL, 512, NO_COPY, NO_CLONE );
+       errors += fail_not_nil( mb2, "realloc payload did not return nil pointer when passed nil mbuf" );
+
+       mb2 = realloc_payload( mbuf, 0, NO_COPY, NO_CLONE );
+       errors += fail_not_nil( mb2, "realloc payload did not return nil pointer when passed bad len" );
+
+       fprintf( stderr, "<TEST> no copy/no clone test starts\n" );
+       mb2 = realloc_payload( mbuf, 2048, NO_COPY, NO_CLONE );
+       errors += fail_if_false( mbuf == mb2, "realloc payload (no copy) of larger size did not return the same msg buffer(1)" );
+       errors += fail_not_equal( mb2->mtype, -1, "realloc payload (no copy) did not reset mtype(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->sub_id, -1, "realloc payload (no copy) did not reset sub-id(a) to expected(b) value" );
+       errors += fail_if_nil( mb2, "realloc payload returned (no copy) a nil pointer when increasing payload len" );
+       errors += fail_not_equal( mb2->len, 0, "realloc payload payload len(a) not expected(b):" );
+       errors += fail_not_equal( rmr_payload_size( mb2), 2048, "realloc payload alloc len(a) not expected(b)" );
+
+       fprintf( stderr, "<TEST> copy/no clone test starts\n" );
+       mbuf = mk_populated_msg( 1024, 0, 99, 100, strlen( payload_str ) );
+       memcpy( mbuf->payload, payload_str, mbuf->len );
+       mb2 = realloc_payload( mbuf, 2048, COPY, NO_CLONE );
+       errors += fail_if_false( mbuf == mb2, "non-clone realloc payload (copy) of larger size did not return the same msg buffer(2)" );
+       errors += fail_if_nil( mb2, "realloc payload (copy) returned a nil pointer when increasing payload len)" );
+       errors += fail_not_equal( mb2->mtype, 99, "realloc payload (copy) did not reset mtype(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->sub_id, 100, "realloc payload (copy) did not reset sub-id(a) to expected(b) value" );
+       errors += fail_if_equal( mb2->len, 0, "realloc payload (copy) msg len(a) not expected(b)" );
+       errors += fail_not_equal( rmr_payload_size( mb2), 2048, "realloc payload (copy) alloc len(a) not expected(b)" );
+       errors += fail_not_equal( strncmp( payload_str, mb2->payload, strlen( payload_str )), 0, "realloc payload(copy) didn't copy payload" );
+
+       fprintf( stderr, "<TEST> copy/clone test starts requested buffer smaller than original\n" );
+       mbuf = mk_populated_msg( 1024, 0, 99, 100, strlen( payload_str ) );
+       memcpy( mbuf->payload, payload_str, mbuf->len );
+       mb2 = realloc_payload( mbuf, 512, COPY, CLONE );
+       errors += fail_if_true( mbuf == mb2, "realloc payload (clone+copy) of larger size did not return different message buffers" );
+       errors += fail_if_nil( mb2, "realloc payload (clone+copy) returned a nil pointer when increasing payload len)" );
+       errors += fail_not_equal( mb2->mtype, 99, "realloc payload (clone+copy) did not reset mtype(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->sub_id, 100, "realloc payload (clone+copy) did not reset sub-id(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->len, strlen( payload_str ), "realloc payload (clone+copy) msg len(a) not expected(b)" );
+       errors += fail_not_equal( rmr_payload_size( mb2), 1024, "realloc payload (clone+copy) alloc len(a) not expected(b)" );
+       errors += fail_not_equal( strncmp( payload_str, mb2->payload, strlen( payload_str )), 0, "realloc payload(clone+copy) didn't copy payload" );
+
+       // with a clone, we must verify that original message looks sane too
+       errors += fail_not_equal( mbuf->mtype, 99, "realloc payload (clone+copy) validation of unchanged mbuf->mtype fails" );
+       errors += fail_not_equal( mbuf->sub_id, 100, "realloc payload (clone+copy) validation of unchanged mbuf->subid fails" );
+       errors += fail_not_equal( mbuf->len, strlen( payload_str ), "realloc payload (clone+copy) validation of unchanged payload len fails" );
+       errors += fail_not_equal( rmr_payload_size( mbuf ), 1024, "realloc payload (clone+copy) validation of unchanged alloc length fails" );
+       errors += fail_not_equal( strncmp( payload_str, mbuf->payload, strlen( payload_str )), 0, "realloc payload(clone+copy) validation of unchanged payload fails" );
+
+
+       fprintf( stderr, "<TEST> copy/clone test starts requested buf is larger than original\n" );
+       mbuf = mk_populated_msg( 1024, 0, 99, 100, strlen( payload_str ) );
+       memcpy( mbuf->payload, payload_str, mbuf->len );
+       mb2 = realloc_payload( mbuf, 2048, COPY, CLONE );
+       errors += fail_if_true( mbuf == mb2, "realloc payload(clone+copy/lg) of larger size did not return different message buffers" );
+       errors += fail_if_nil( mb2, "realloc payload (clone+copy/lg) returned a nil pointer when increasing payload len)" );
+       errors += fail_not_equal( mb2->mtype, 99, "realloc payload (clone+copy/lg) did not reset mtype(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->sub_id, 100, "realloc payload (clone+copy/lg) did not reset sub-id(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->len, strlen( payload_str ), "realloc payload (clone+copy/lg) msg len(a) not expected(b)" );
+       errors += fail_not_equal( rmr_payload_size( mb2), 2048, "realloc payload (clone+copy/lg) alloc len(a) not expected(b)" );
+       errors += fail_not_equal( strncmp( payload_str, mb2->payload, strlen( payload_str )), 0, "realloc payload(clone+copy/lg) didn't copy payload" );
+
+       // with a clone, we must verify that original message looks sane too
+       errors += fail_not_equal( mbuf->mtype, 99, "realloc payload (clone+copy/lg) validation of unchanged mbuf->mtype fails" );
+       errors += fail_not_equal( mbuf->sub_id, 100, "realloc payload (clone+copy/lg) validation of unchanged mbuf->subid fails" );
+       errors += fail_not_equal( mbuf->len, strlen( payload_str ), "realloc payload (clone+copy/lg) validation of unchanged payload len fails" );
+       errors += fail_not_equal( rmr_payload_size( mbuf ), 1024, "realloc payload (clone+copy/lg) validation of unchanged alloc length fails" );
+       errors += fail_not_equal( strncmp( payload_str, mbuf->payload, strlen( payload_str )), 0, "realloc payload(clone+copy/lg) validation of unchanged payload fails" );
+
+       // original message should be unharmed, and new message should have no type/sid or payload len; total alloc len should be requested enlargement
+       fprintf( stderr, "<TEST> no copy/clone test starts requested buf is larger than original\n" );
+       mbuf = mk_populated_msg( 1024, 0, 99, 100, strlen( payload_str ) );
+       memcpy( mbuf->payload, payload_str, mbuf->len );
+       mb2 = realloc_payload( mbuf, 2048, NO_COPY, CLONE );
+       errors += fail_if_true( mbuf == mb2, "realloc payload (clone+nocopy) of larger size did not return different message buffers" );
+       errors += fail_if_nil( mb2, "realloc payload (clone+nocopy) returned a nil pointer when increasing payload len)" );
+       errors += fail_not_equal( mb2->mtype, -1, "realloc payload (clone+nocopy) did not reset mtype(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->sub_id, -1, "realloc payload (clone+nocopy) did not reset sub-id(a) to expected(b) value" );
+       errors += fail_not_equal( mb2->len, 0, "realloc payload (clone+nocopy) msg len(a) not expected(b)" );
+       errors += fail_not_equal( rmr_payload_size( mb2 ), 2048, "realloc payload (clone+nocopy) alloc len(a) not expected(b)" );
+       errors += fail_if_equal( strncmp( payload_str, mb2->payload, strlen( payload_str )), 0, "realloc payload(clone+nocopy) copied payload when not supposed to" );
+
+       // with a clone, we must verify that original message looks sane too
+       errors += fail_not_equal( mbuf->mtype, 99, "realloc payload (clone+nocopy) validation of unchanged mbuf->mtype fails" );
+       errors += fail_not_equal( mbuf->sub_id, 100, "realloc payload (clone+nocopy) validation of unchanged mbuf->subid fails" );
+       errors += fail_not_equal( mbuf->len, strlen( payload_str ), "realloc payload (clone+nocopy) validation of unchanged payload len fails" );
+       errors += fail_not_equal( rmr_payload_size( mbuf ), 1024, "realloc payload (clone+nocopy) validation of unchanged alloc length fails" );
+       errors += fail_not_equal( strncmp( payload_str, mbuf->payload, strlen( payload_str )), 0, "realloc payload (clone+nocopy) validation of unchanged payload fails" );
+
+
        return !!errors;
 }
index 8c6a9ca..fc60301 100644 (file)
@@ -190,6 +190,11 @@ static void em_set_rcvcount( int v ) {
 }
 
 static void em_set_rcvdelay( int v ) {
+       if( v < 0 ) {
+               fprintf( stderr, "<EM>   ##ERR## attempt to set receive delay with invalid value was ignored: %d seconds\n", v );
+               return;
+       }
+       fprintf( stderr, "<EM>   receive delay is now %d seconds\n", v );
        rcv_delay = v;
 }
 
index 08254a0..084256f 100644 (file)
@@ -153,6 +153,7 @@ static int fail_if_equalp( void* a, void* b, char* what ) {
 }
 
 
+// for symtab and other non-message things this allows them to exclude by setting
 #ifndef NO_DUMMY_RMR
 /*
        Dummy message allocator for testing without sr_static functions
@@ -166,7 +167,7 @@ static rmr_mbuf_t* test_mk_msg( int len, int tr_len ) {
        uta_mhdr_t* hdr;
        size_t  alen;
 
-       alen = sizeof( *hdr ) + tr_len + len;
+       alen = sizeof( *hdr ) + tr_len + len;   // this does no support allocating len2 and len3 data fields
 
        new_msg = (rmr_mbuf_t *) malloc( sizeof *new_msg );
        new_msg->tp_buf = (void *) malloc( alen );
@@ -197,7 +198,55 @@ static void test_set_ver( rmr_mbuf_t* msg, int ver ) {
 
        return;
 }
-#endif
 
+/*
+       These allow values to be pushed deep into the real RMR header allocated
+       at the front of the transport buffer. These are needed to simulate
+       the actions of rmr_send() which pushes the values from the message buffer
+       just before putting them on the wire.
+*/
+static void test_set_mtype( rmr_mbuf_t* msg, int mtype ) {
+       uta_mhdr_t* hdr;
+
+       msg->mtype = mtype;
+       hdr = (uta_mhdr_t*) msg->tp_buf;
+       hdr->mtype = htonl( mtype );
+}
+
+static void test_set_sid( rmr_mbuf_t* msg, int sid ) {
+       uta_mhdr_t* hdr;
+
+       msg->sub_id = sid;
+       hdr = (uta_mhdr_t*) msg->tp_buf;
+       hdr->sub_id = htonl( sid );
+}
+
+static void test_set_plen( rmr_mbuf_t* msg, int plen ) {
+       uta_mhdr_t* hdr;
+
+       msg->len = plen;
+       hdr = (uta_mhdr_t*) msg->tp_buf;
+       hdr->plen = htonl( plen );
+}
+
+/*
+       Build a message and populate both the msg buffer and the tranport header
+       with mid, sid, and payload len. Tr_len causes that much space in the 
+       header for trace info to be reserved.
+*/
+static rmr_mbuf_t* mk_populated_msg( int alloc_len, int tr_len, int mtype, int sid, int plen ) {
+       uta_mhdr_t* hdr;
+       rmr_mbuf_t* mbuf;
+
+       mbuf = test_mk_msg( alloc_len, tr_len );
+       test_set_mtype( mbuf, mtype );
+       test_set_sid( mbuf, sid );
+       test_set_plen( mbuf, plen );
+
+       return mbuf;
+}
+
+
+#endif
 
 #endif
index 214f94d..9ab1827 100755 (executable)
@@ -346,6 +346,7 @@ do
                -N)     run_nano_tests=1;;
 
                -c)     module_cov_target=$2; shift;;
+               -e)     capture_file=$2; >$capture_file; shift;;                # capture errors from failed tests rather than spewing on tty
                -f)     force_discounting=1;
                        trigger_discount_str="WARN|FAIL|PASS"           # check all outcomes for each module
                        ;;
@@ -449,11 +450,20 @@ do
        if ! ./${tfile%.c} >/tmp/PID$$.log 2>&1
        then
                echo "[FAIL] unit test failed for: $tfile"
-               if (( quiet ))
+               if [[ -n capture_file ]] 
                then
-                       grep "^<" /tmp/PID$$.log        # in quiet mode just dump <...> messages which are assumed from the test programme not appl
+                       echo "all errors captured in $capture_file, listing only fail message on tty"
+                       echo "$tfile --------------------------------------" >>$capture_file
+                       cat /tmp/PID$$.log >>$capture_file
+                       grep "^<FAIL>" /tmp/PID$$.log
+                       echo ""
                else
-                       cat /tmp/PID$$.log
+                       if (( quiet ))
+                       then
+                               grep "^<" /tmp/PID$$.log|grep -v "^<EM>"        # in quiet mode just dump <...> messages which are assumed from the test programme not appl
+                       else
+                               cat /tmp/PID$$.log
+                       fi
                fi
                (( ut_errors++ ))                               # cause failure even if not in strict mode
                continue                                                # skip coverage tests for this