Fix large message bug in SI95 data callback 36/2336/1 3.0.5
authorE. Scott Daniels <daniels@research.att.com>
Fri, 24 Jan 2020 21:00:11 +0000 (16:00 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 24 Jan 2020 21:00:11 +0000 (16:00 -0500)
The receive buffer was not being allocated correctly and in some
cases would cause a segment fault when a large buffer was received.
The fix also includes dropping a message which is larger than the
allocated message.  The rmr_init() manual page has been updated to
better define the meaning of the max message length pararmeter which
is passed.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I20eb0d7ed8ba914b380807d9b3142d51d4f9f0b6

CHANGES
CMakeLists.txt
doc/src/man/rmr_init.3.xfm
docs/rel-notes.rst
docs/user-guide.rst
src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c

diff --git a/CHANGES b/CHANGES
index f3d5c77..249d2cf 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 January 24; verison 3.0.5
+       Fix bug in SI95 with receive buffer allocation.
+
 2020 January 23; verison 3.0.4
        Fix bug in SI95 causing excessive CPU usage on poll.
 
index 89f5cb9..981b158 100644 (file)
@@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "4" )
+set( patch_level "5" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 5413beb..5e53e63 100644 (file)
@@ -1,7 +1,7 @@
 .if false
 ==================================================================================
-       Copyright (c) 2019 Nokia 
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia 
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -51,9 +51,15 @@ send messages.
 
 &space
 &ital(Port) is used to listen for connection requests from other RMR based applications.
-The value of &ital(max_msg_size) will be used when allocating zero copy send buffers
-which must be allocated, possibly, prior to the application knowing the actual size of
-a specific message. 
+The &ital(max_msg_size) parameter is used to allocate receive buffers and is the
+maximum message size which the application expects to receive. 
+This value is the sum of &bold(both) the maximum payload size &bold(and) the maximum
+trace data size. 
+This value is also used as the default message size when allocating message buffers.
+Messages arriving which are longer than the given maximum will be dropped without 
+notification to the application. 
+A warning is written to standard error for the first message which is too large on
+each connection.
 
 &space
 &ital(Flags) allows for selection of some RMr options at the time of initialisation. 
@@ -73,6 +79,15 @@ following flags are supported:
 &half_space
 &diitem(RMRFL_MTCALL)
        Enable multi-threaded call support. 
+
+&half_space
+&ditem(RMRFL_NOLOCK)
+       Some underlying transport providers (e.g. SI95) enable locking to be turned off
+       if the user application is single threaded, or otherwise can guarantee that RMR
+       functions will not be invoked concurrently from different threads. Turning off
+       locking can help make message receipt more efficient. 
+       If this flag is set when the underlying transport does not support disabling
+       locks, it will be ignored.
 &end_dlist
 
 &h3(Multi-threaded Calling)
@@ -81,7 +96,7 @@ was limited such that only user applications which were operating in a single th
 could safely use the function.
 Further, timeouts were message count based and not time unit based. 
 Multi-threaded call support adds the ability for a user application with multiple threads
-to invoke a blocking call function with the guarentee that the correct response message 
+to invoke a blocking call function with the guarantee that the correct response message 
 is delivered to the thread.  
 The additional support is implemented with the &ital(rmr_mt_call()) and &ital(rmr_mt_rcv())
 function calls. 
index 35c181f..a9b2ab7 100644 (file)
@@ -15,6 +15,46 @@ file at the repo root; please refer to that file for a
 completely up to date listing of API changes. 
  
  
+2020 January 24; verison 3.0.5 
+-------------------------------------------------------------------------------------------- 
+Fix bug in SI95 with receive buffer allocation. 
+2020 January 23; verison 3.0.4 
+-------------------------------------------------------------------------------------------- 
+Fix bug in SI95 causing excessive CPU usage on poll. 
+2020 January 22; verison 3.0.3 
+-------------------------------------------------------------------------------------------- 
+Enable thread support for multiple receive threads. 
+2020 January 21; verison 3.0.2 
+-------------------------------------------------------------------------------------------- 
+Fix bug in SI95 (missing reallocate payload function). 
+2020 January 20; verison 3.0.1 
+-------------------------------------------------------------------------------------------- 
+Enable support for dynamic route table updates via RMR 
+session. 
+2020 January 16; version 3.0.0 
+-------------------------------------------------------------------------------------------- 
+Introduce support for SI95 transport library to replace NNG. 
+(RMR library versions will use leading odd numbers to avoid 
+tag collisions with the wrapper tags which will use even 
+numbers.) 
 2019 December 9; version 1.13.1 
 -------------------------------------------------------------------------------------------- 
  
index ad62076..c0b7bc8 100644 (file)
@@ -1180,10 +1180,16 @@ which provides the necessary routing information for the RMR
 library to send messages. 
  
 *Port* is used to listen for connection requests from other 
-RMR based applications. The value of *max_msg_size* will be 
-used when allocating zero copy send buffers which must be 
-allocated, possibly, prior to the application knowing the 
-actual size of a specific message. 
+RMR based applications. The *max_msg_size* parameter is used 
+to allocate receive buffers and is the maximum message size 
+which the application expects to receive. This value is the 
+sum of **both** the maximum payload size **and** the maximum 
+trace data size. This value is also used as the default 
+message size when allocating message buffers. Messages 
+arriving which are longer than the given maximum will be 
+dropped without notification to the application. A warning is 
+written to standard error for the first message which is too 
+large on each connection. 
  
 *Flags* allows for selection of some RMr options at the time 
 of initialisation. These are set by ORing RMRFL constants 
@@ -1207,6 +1213,15 @@ RMRFL_NOTHREAD
 RMRFL_MTCALL 
    
   Enable multi-threaded call support. 
+   
+  &ditem Some underlying transport providers (e.g. SI95) 
+  enable locking to be turned off if the user application is 
+  single threaded, or otherwise can guarantee that RMR 
+  functions will not be invoked concurrently from different 
+  threads. Turning off locking can help make message receipt 
+  more efficient. If this flag is set when the underlying 
+  transport does not support disabling locks, it will be 
+  ignored. 
  
  
 Multi-threaded Calling 
@@ -1218,7 +1233,7 @@ applications which were operating in a single thread could
 safely use the function. Further, timeouts were message count 
 based and not time unit based. Multi-threaded call support 
 adds the ability for a user application with multiple threads 
-to invoke a blocking call function with the guarentee that 
+to invoke a blocking call function with the guarantee that 
 the correct response message is delivered to the thread. The 
 additional support is implemented with the *rmr_mt_call()* 
 and *rmr_mt_rcv()* function calls. 
@@ -2463,7 +2478,7 @@ RETURN VALUE
 On success, a new message buffer, with an empty payload, is 
 returned for the application to use for the next send. The 
 state in this buffer will reflect the overall send operation 
-state and should be RMR_OK
+state and will be RMR_OK when the send was successful
  
 When the message cannot be successfully sent this function 
 will return the unsent (original) message buffer with the 
@@ -2476,6 +2491,15 @@ In this case the value of errno might be of some use, for
 documentation, but there will be little that the user 
 application can do other than to move on. 
  
+**CAUTION:** In some cases it is extremely likely that the 
+message returned by the send function does **not** reference 
+the same memory structure. Thus is important for the user 
+programme to capture the new pointer for future use or to be 
+passed to rmr_free(). If you are experiencing either double 
+free errors or segment faults in either rmr_free() or 
+rmr_send_msg(), ensure that the return value from this 
+function is being captured and used. 
 ERRORS 
 -------------------------------------------------------------------------------------------- 
  
@@ -2605,19 +2629,18 @@ cycles.
       }
       // reference payload and fill in message type
      pm = (msg_t*) send_msg->payload;
-     send_msg->mtype = MT_ANSWER;    
-     msg->len = generate_data( pm );         e// something that fills the payload in
-     msg = rmr_send_msg( mr, send_msg );
+     send_msg->mtype = MT_ANSWER;
+     msg->len = generate_data( pm );       // something that fills the payload in
+     msg = rmr_send_msg( mr, send_msg );   // ensure new pointer used after send
      mif( ! msg ) {
      m    !return ERROR;
      m} else {
      m    sif( msg->state != RMR_OK ) {
-     m    s    m// check for eagain, and resend if needed
+     m    s    m// check for RMR_ERR_RETRY, and resend if needed
      m    s    m// else return error
      m    s}
      m}
      mreturn OK;
-     m    r    ;
  
  
  
index 132873c..c816486 100644 (file)
@@ -48,6 +48,9 @@
 #define RS_GOOD                1               // flow is in progress
 #define RS_RESET       2               // flow was interrupted; reset on next receive
 
+#define RF_NOTIFIED    0x01    // notification made about river issue
+#define RF_DROP                0x02    // this message is large and being dropped
+
 #define SI_MAX_ADDR_LEN                512
 
 /*
@@ -58,9 +61,8 @@ typedef struct {
        char*   accum;          // bytes being accumulated
        int             nbytes;         // allocated size of accumulator
        int             ipt;            // insertion point in accumulator
-       //int           max;            // size of accum
-       //int           expected;       // expected for a complete message
        int             msg_size;       // size of the message being accumulated
+       int             flags;          // RF_* constants
 } river_t;
 
 
index fa603ff..55636bb 100644 (file)
@@ -185,20 +185,33 @@ fprintf( stderr, "\n" );
                                river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
                        }
                        if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+
+                       if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
+                               river->flags |= RF_DROP;
+                       }
                }
 
                if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
                        if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
-                       memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       if( (river->flags & RF_DROP) == 0  ) {
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       }
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
                        if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
-                       memcpy( &river->accum[river->ipt], buf+bidx, need );            // grab just what is needed (might be more)
-                       buf2mbuf( ctx, river->accum, river->msg_size, fd );                             // build an RMR mbuf and queue
+                       if( (river->flags & RF_DROP) == 0  ) {
+                               memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
+                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
+                       } else {
+                               if( !(river->flags & RF_NOTIFIED) ) {   
+                                       fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+                                       river->flags |= RF_NOTIFIED;
+                               }
+                       }
 
-                       river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
index c8c3b4a..e50b5d0 100644 (file)
@@ -522,6 +522,10 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
        invokes this.  Internal functions (the route table collector) which need additional
        open ports without starting additional route table collectors, will invoke this
        directly with the proper flag.
+
+       CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
+                               that we know about. The _user_ should ensure that the supplied length also
+                               includes the trace data length maximum as they are in control of that.
 */
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        static  int announced = 0;
@@ -567,7 +571,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
+       ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64;          // add in our header size and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
        ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls