From a1575dacc478b945ea63f5d0cc3db3d66dcb5983 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Fri, 24 Jan 2020 16:00:11 -0500 Subject: [PATCH] Fix large message bug in SI95 data callback The receive buffer was not being allocated correctly and in some cases would cause a segment fault when a large buffer was received. The fix also includes dropping a message which is larger than the allocated message. The rmr_init() manual page has been updated to better define the meaning of the max message length pararmeter which is passed. Signed-off-by: E. Scott Daniels Change-Id: I20eb0d7ed8ba914b380807d9b3142d51d4f9f0b6 --- CHANGES | 3 +++ CMakeLists.txt | 2 +- doc/src/man/rmr_init.3.xfm | 27 +++++++++++++++++----- docs/rel-notes.rst | 40 +++++++++++++++++++++++++++++++++ docs/user-guide.rst | 45 ++++++++++++++++++++++++++++--------- src/rmr/si/include/rmr_si_private.h | 6 +++-- src/rmr/si/src/mt_call_si_static.c | 21 +++++++++++++---- src/rmr/si/src/rmr_si.c | 7 +++++- 8 files changed, 126 insertions(+), 25 deletions(-) diff --git a/CHANGES b/CHANGES index f3d5c77..249d2cf 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ API and build change and fix summaries. Doc correctsions and/or changes are not mentioned here; see the commit messages. +2020 January 24; verison 3.0.5 + Fix bug in SI95 with receive buffer allocation. + 2020 January 23; verison 3.0.4 Fix bug in SI95 causing excessive CPU usage on poll. diff --git a/CMakeLists.txt b/CMakeLists.txt index 89f5cb9..981b158 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "4" ) +set( patch_level "5" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/doc/src/man/rmr_init.3.xfm b/doc/src/man/rmr_init.3.xfm index 5413beb..5e53e63 100644 --- a/doc/src/man/rmr_init.3.xfm +++ b/doc/src/man/rmr_init.3.xfm @@ -1,7 +1,7 @@ .if false ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 Nokia + Copyright (c) 2018-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -51,9 +51,15 @@ send messages. &space &ital(Port) is used to listen for connection requests from other RMR based applications. -The value of &ital(max_msg_size) will be used when allocating zero copy send buffers -which must be allocated, possibly, prior to the application knowing the actual size of -a specific message. +The &ital(max_msg_size) parameter is used to allocate receive buffers and is the +maximum message size which the application expects to receive. +This value is the sum of &bold(both) the maximum payload size &bold(and) the maximum +trace data size. +This value is also used as the default message size when allocating message buffers. +Messages arriving which are longer than the given maximum will be dropped without +notification to the application. +A warning is written to standard error for the first message which is too large on +each connection. &space &ital(Flags) allows for selection of some RMr options at the time of initialisation. @@ -73,6 +79,15 @@ following flags are supported: &half_space &diitem(RMRFL_MTCALL) Enable multi-threaded call support. + +&half_space +&ditem(RMRFL_NOLOCK) + Some underlying transport providers (e.g. SI95) enable locking to be turned off + if the user application is single threaded, or otherwise can guarantee that RMR + functions will not be invoked concurrently from different threads. Turning off + locking can help make message receipt more efficient. + If this flag is set when the underlying transport does not support disabling + locks, it will be ignored. &end_dlist &h3(Multi-threaded Calling) @@ -81,7 +96,7 @@ was limited such that only user applications which were operating in a single th could safely use the function. Further, timeouts were message count based and not time unit based. Multi-threaded call support adds the ability for a user application with multiple threads -to invoke a blocking call function with the guarentee that the correct response message +to invoke a blocking call function with the guarantee that the correct response message is delivered to the thread. The additional support is implemented with the &ital(rmr_mt_call()) and &ital(rmr_mt_rcv()) function calls. diff --git a/docs/rel-notes.rst b/docs/rel-notes.rst index 35c181f..a9b2ab7 100644 --- a/docs/rel-notes.rst +++ b/docs/rel-notes.rst @@ -15,6 +15,46 @@ file at the repo root; please refer to that file for a completely up to date listing of API changes. +2020 January 24; verison 3.0.5 +-------------------------------------------------------------------------------------------- + +Fix bug in SI95 with receive buffer allocation. + + +2020 January 23; verison 3.0.4 +-------------------------------------------------------------------------------------------- + +Fix bug in SI95 causing excessive CPU usage on poll. + + +2020 January 22; verison 3.0.3 +-------------------------------------------------------------------------------------------- + +Enable thread support for multiple receive threads. + + +2020 January 21; verison 3.0.2 +-------------------------------------------------------------------------------------------- + +Fix bug in SI95 (missing reallocate payload function). + + +2020 January 20; verison 3.0.1 +-------------------------------------------------------------------------------------------- + +Enable support for dynamic route table updates via RMR +session. + + +2020 January 16; version 3.0.0 +-------------------------------------------------------------------------------------------- + +Introduce support for SI95 transport library to replace NNG. +(RMR library versions will use leading odd numbers to avoid +tag collisions with the wrapper tags which will use even +numbers.) + + 2019 December 9; version 1.13.1 -------------------------------------------------------------------------------------------- diff --git a/docs/user-guide.rst b/docs/user-guide.rst index ad62076..c0b7bc8 100644 --- a/docs/user-guide.rst +++ b/docs/user-guide.rst @@ -1180,10 +1180,16 @@ which provides the necessary routing information for the RMR library to send messages. *Port* is used to listen for connection requests from other -RMR based applications. The value of *max_msg_size* will be -used when allocating zero copy send buffers which must be -allocated, possibly, prior to the application knowing the -actual size of a specific message. +RMR based applications. The *max_msg_size* parameter is used +to allocate receive buffers and is the maximum message size +which the application expects to receive. This value is the +sum of **both** the maximum payload size **and** the maximum +trace data size. This value is also used as the default +message size when allocating message buffers. Messages +arriving which are longer than the given maximum will be +dropped without notification to the application. A warning is +written to standard error for the first message which is too +large on each connection. *Flags* allows for selection of some RMr options at the time of initialisation. These are set by ORing RMRFL constants @@ -1207,6 +1213,15 @@ RMRFL_NOTHREAD RMRFL_MTCALL Enable multi-threaded call support. + + &ditem Some underlying transport providers (e.g. SI95) + enable locking to be turned off if the user application is + single threaded, or otherwise can guarantee that RMR + functions will not be invoked concurrently from different + threads. Turning off locking can help make message receipt + more efficient. If this flag is set when the underlying + transport does not support disabling locks, it will be + ignored. Multi-threaded Calling @@ -1218,7 +1233,7 @@ applications which were operating in a single thread could safely use the function. Further, timeouts were message count based and not time unit based. Multi-threaded call support adds the ability for a user application with multiple threads -to invoke a blocking call function with the guarentee that +to invoke a blocking call function with the guarantee that the correct response message is delivered to the thread. The additional support is implemented with the *rmr_mt_call()* and *rmr_mt_rcv()* function calls. @@ -2463,7 +2478,7 @@ RETURN VALUE On success, a new message buffer, with an empty payload, is returned for the application to use for the next send. The state in this buffer will reflect the overall send operation -state and should be RMR_OK. +state and will be RMR_OK when the send was successful. When the message cannot be successfully sent this function will return the unsent (original) message buffer with the @@ -2476,6 +2491,15 @@ In this case the value of errno might be of some use, for documentation, but there will be little that the user application can do other than to move on. +**CAUTION:** In some cases it is extremely likely that the +message returned by the send function does **not** reference +the same memory structure. Thus is important for the user +programme to capture the new pointer for future use or to be +passed to rmr_free(). If you are experiencing either double +free errors or segment faults in either rmr_free() or +rmr_send_msg(), ensure that the return value from this +function is being captured and used. + ERRORS -------------------------------------------------------------------------------------------- @@ -2605,19 +2629,18 @@ cycles. } // reference payload and fill in message type pm = (msg_t*) send_msg->payload; - send_msg->mtype = MT_ANSWER; - msg->len = generate_data( pm ); e// something that fills the payload in - msg = rmr_send_msg( mr, send_msg ); + send_msg->mtype = MT_ANSWER; + msg->len = generate_data( pm ); // something that fills the payload in + msg = rmr_send_msg( mr, send_msg ); // ensure new pointer used after send mif( ! msg ) { m !return ERROR; m} else { m sif( msg->state != RMR_OK ) { - m s m// check for eagain, and resend if needed + m s m// check for RMR_ERR_RETRY, and resend if needed m s m// else return error m s} m} mreturn OK; - m r ; diff --git a/src/rmr/si/include/rmr_si_private.h b/src/rmr/si/include/rmr_si_private.h index 132873c..c816486 100644 --- a/src/rmr/si/include/rmr_si_private.h +++ b/src/rmr/si/include/rmr_si_private.h @@ -48,6 +48,9 @@ #define RS_GOOD 1 // flow is in progress #define RS_RESET 2 // flow was interrupted; reset on next receive +#define RF_NOTIFIED 0x01 // notification made about river issue +#define RF_DROP 0x02 // this message is large and being dropped + #define SI_MAX_ADDR_LEN 512 /* @@ -58,9 +61,8 @@ typedef struct { char* accum; // bytes being accumulated int nbytes; // allocated size of accumulator int ipt; // insertion point in accumulator - //int max; // size of accum - //int expected; // expected for a complete message int msg_size; // size of the message being accumulated + int flags; // RF_* constants } river_t; diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index fa603ff..55636bb 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -185,20 +185,33 @@ fprintf( stderr, "\n" ); river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later } if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size ); + + if( river->msg_size > river->nbytes ) { // message is too big, we will drop it + river->flags |= RF_DROP; + } } if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( (river->flags & RF_DROP) == 0 ) { + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); - memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) - buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue + if( (river->flags & RF_DROP) == 0 ) { + memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) + buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue + river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator + } else { + if( !(river->flags & RF_NOTIFIED) ) { + fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); + river->flags |= RF_NOTIFIED; + } + } - river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator river->msg_size = -1; river->ipt = 0; bidx += need; diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index c8c3b4a..e50b5d0 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -522,6 +522,10 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { invokes this. Internal functions (the route table collector) which need additional open ports without starting additional route table collectors, will invoke this directly with the proper flag. + + CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths + that we know about. The _user_ should ensure that the supplied length also + includes the trace data length maximum as they are in control of that. */ static void* init( char* uproto_port, int max_msg_size, int flags ) { static int announced = 0; @@ -567,7 +571,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->d1_len = 4; // data1 space in header -- 4 bytes for now - ctx->max_ibm = max_msg_size; // default to user supplied message size + ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt + ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64; // add in our header size and a bit of fudge ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls -- 2.16.6