# API and build change and fix summaries. Doc corrections
# and/or changes are not mentioned here; see the commit messages.
+2021 January21; Version 4.5.2
+ Fixes the excessive TCP session bug when sending to a slow receiver
+ and a related segment fault because of too many open file descriptors.
+ (RIC-735)
+
+2021 January 19; Version 4.5.1
+ Version bump to work round a CI job bug preventing push of the 4.5.0
+ packages from staging to release in package cloud. (RIC-732)
+
2021 January 8; Version 4.5.0
Version bump for next release tracking.
Corrected a potential locking issue in message allocation. (RIC-732)
set( major_version "4" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "5" )
-set( patch_level "0" )
+set( patch_level "2" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
version 4.0.0, the RMR versions should no longer skip.
+2021 January21; Version 4.5.2
+-----------------------------
+
+Fixes the excessive TCP session bug when sending to a slow
+receiver and a related segment fault because of too many open
+file descriptors. (RIC-735)
+
+
+
+2021 January 19; Version 4.5.1
+------------------------------
+
+Version bump to work round a CI job bug preventing push of
+the 4.5.0 packages from staging to release in package cloud.
+(RIC-732)
+
+
+
2021 January 8; Version 4.5.0
-----------------------------
The return value is the number of bytes actually coppied. If 0 bytes are coppied
errno should indicate the reason. If 0 is returned and errno is 0, then size
passed was 0. The state in the message is left UNCHANGED.
+
+ Regardless of action taken (actual realloc or not) the caller's reference to mbuf
+ is still valid follwing the call and will point to the correct spot (same tp
+ buffer if no realloc needed, or the new one if there was).
*/
extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) {
uta_mhdr_t* hdr;
if( len != size ) { // different sized trace data, must realloc the buffer
nm = rmr_realloc_msg( msg, size ); // realloc with changed trace size
- old_tp_buf = msg->tp_buf;
+ old_tp_buf = msg->tp_buf; // hold to repoint new mbuf at small buffer
old_hdr = msg->header;
msg->tp_buf = nm->tp_buf; // reference the reallocated buffer
msg->xaction = nm->xaction;
msg->payload = nm->payload;
- nm->tp_buf = old_tp_buf; // set to free
+ nm->tp_buf = old_tp_buf; // set to free; point to the small buffer
nm->header = old_hdr; // nano frees on hdr, so must set both
rmr_free_msg( nm );
#define SI_MAX_ADDR_LEN 512
+#define MAX_RIVERS 1024 // max number of directly mapped rivers
/*
Manages a river of inbound bytes.
si_ctx_t* si_ctx; // the socket context
int nrivers; // allocated rivers
river_t* rivers; // inbound flows (index is the socket fd)
+ void* river_hash; // flows with fd values > nrivers must be mapped through the hash
int max_ibm; // max size of an inbound message (river accum alloc size)
void* zcb_mring; // zero copy buffer mbuf ring
void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect
-// : vi ts=4 sw=4 noet:
+ // : vi ts=4 sw=4 noet2
/*
==================================================================================
Copyright (c) 2020 Nokia
#include <semaphore.h>
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
- static int warned = 0;
+ static time_t last_warning = 0;
+ static long dcount = 0;
+
chute_t* chute;
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
- if( !warned ) {
- rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
- warned++;
+ dcount++;
+ if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount );
+ last_warning = time( NULL );
+ dcount = 0;
}
return;
}
}
}
+ } else {
+ free( raw_msg );
}
}
unsigned char* old_accum; // old accumulator reference should we need to realloc
int bidx = 0; // transport buffer index
int remain; // bytes in transport buf that need to be moved
- int* mlen; // pointer to spot in buffer for conversion to int
+ int* mlen; // pointer to spot in buffer for conversion to int
int need; // bytes needed for something
int i;
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
- if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
- return SI_RET_OK;
- }
} else {
ctx = (uta_ctx_t *) vctx;
}
- if( buflen <= 0 ) {
+ if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd
return SI_RET_OK;
}
- river = &ctx->rivers[fd];
+ if( fd >= ctx->nrivers ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+ river = (river_t *) malloc( sizeof( *river ) );
+ memset( river, 0, sizeof( *river ) );
+ rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+ river->state = RS_NEW;
+ }
+ } else {
+ river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD
+ }
+
if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
+ if( river->accum != NULL ) {
+ free( river->accum );
+ }
memset( river, 0, sizeof( *river ) );
river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
river->accum = (char *) malloc( river->nbytes );
static int mt_disc_cb( void* vctx, int fd ) {
uta_ctx_t* ctx;
endpoint_t* ep;
+ river_t* river = NULL;
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
+ if( fd < ctx->nrivers && fd >= 0 ) {
+ river = &ctx->rivers[fd];
+ } else {
+ if( fd > 0 ) {
+ river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+ }
+ }
+
+ if( river != NULL ) {
+ river->state = RS_NEW; // if one connects here later; ensure it's new
+ if( river->accum != NULL ) {
+ free( river->accum );
+ river->accum = NULL;
+ river->state = RS_NEW; // force realloc if the fd is used again
+ }
+ }
+
ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
if( ep != NULL ) {
- pthread_mutex_lock( &ep->gate ); // wise to lock this
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
ep->open = FALSE;
ep->nn_sock = -1;
- pthread_mutex_unlock( &ep->gate );
+ pthread_mutex_unlock( &ep->gate );
}
return SI_RET_OK;
return NULL;
}
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+ rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() );
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
This provides an external path to the realloc static function as it's called by an
outward facing mbuf api function. Used to reallocate a message with a different
trace data size.
+
+ User programmes must use this with CAUTION! The mbuf passed in is NOT freed and
+ is still valid following this call. The caller is reponsible for maintainting
+ a pointer to both old and new messages and invoking rmr_free_msg() on both!
*/
extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
return realloc_msg( msg, new_tr_size );
*/
extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
if( mbuf == NULL ) {
+ fprintf( stderr, ">>>FREE nil buffer\n" );
return;
}
+#ifdef KEEP
if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
! mbuf->ring || // cant cache if no ring
! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
+#else
+ // always free, never manage a pool
+ if( mbuf->tp_buf ) {
+ free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
+ }
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
+ free( mbuf );
+#endif
}
/*
that we know about. The _user_ should ensure that the supplied length also
includes the trace data length maximum as they are in control of that.
*/
-static void* init( char* uproto_port, int def_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
static int announced = 0;
uta_ctx_t* ctx = NULL;
char bind_info[256]; // bind info
if( ! announced ) {
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
- RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
proto_port = strdup( DEF_COMM_PORT );
+ rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
} else {
proto_port = strdup( uproto_port ); // so we can modify it
}
memset( ctx, 0, sizeof( uta_ctx_t ) );
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
- ctx->nrivers = 256; // number of input flows we'll manage
+ ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max
ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+ ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed
memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
for( i = 0; i < ctx->nrivers; i++ ) {
ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
} else {
port = proto_port; // assume something like "1234" was passed
}
+ rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
static_rtc = 1;
*
* Date: 27 March 1995
* Author: E. Scott Daniels
-* Mod: 22 Feb 2002 - To better process queued data
+* Mod: 22 Feb 2002 - To better process queued data
* 14 Feb 2020 - To fix index bug if fd < 0.
*
*****************************************************************************
*/
-#include "sisetup.h" // get setup stuff
+#include "sisetup.h" // get setup stuff
#include "sitransport.h"
/*
//extern int SIsendt_nq( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
int status = SI_ERROR; // assume we fail
- fd_set writefds; // local write fdset to check blockage
- fd_set execpfds; // exception fdset to check errors
- struct tp_blk *tpptr; // pointer at the tp_blk for the session
- struct ioq_blk *qptr; // pointer at i/o queue block
- struct timeval time; // delay time parameter for select call
+ fd_set writefds; // local write fdset to check blockage
+ fd_set execpfds; // exception fdset to check errors
+ struct tp_blk *tpptr; // pointer at the tp_blk for the session
+ struct ioq_blk *qptr; // pointer at i/o queue block
+ struct timeval time; // delay time parameter for select call
int sidx = 0; // send index
errno = EINVAL;
tpptr->sent++; // investigate: this may over count
- FD_ZERO( &writefds ); // clear for select call
- FD_SET( fd, &writefds ); // set to see if this one was writable
- FD_ZERO( &execpfds ); // clear and set execptions fdset
+ FD_ZERO( &writefds ); // clear for select call
+ FD_SET( fd, &writefds ); // set to see if this one was writable
+ FD_ZERO( &execpfds ); // clear and set execptions fdset
FD_SET( fd, &execpfds );
time.tv_sec = 0; // set both to 0 if we just want a poll, else we block at max this amount
time.tv_usec = 1; // small pause on check to help drain things
if( select( fd + 1, NULL, &writefds, &execpfds, &time ) > 0 ) { // would block if <= 0
- if( FD_ISSET( fd, &execpfds ) ) { // error?
+ if( FD_ISSET( fd, &execpfds ) ) { // error?
errno = EBADFD;
- SIterm( gptr, tpptr ); // mark block for deletion when safe
+ SIterm( gptr, tpptr ); // mark block for deletion when safe
return SI_ERROR; // and bail from this sinking ship
} else {
errno = 0;
status = SI_ERR_BLOCKED;
}
} else {
- errno = EBADFD; // fd in a bad state (probably losed)
+ errno = EBADFD; // fd in a bad state (probably lost)
}
return status;
rmr_free_msg( msg ); // not wanting a meessage back, trash this one
return NULL;
}
- } else { // send failed -- return original message
- if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
+ } else { // send failed or would block -- return original message
+ if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
errno = EAGAIN;
- msg->state = RMR_ERR_RETRY; // errno will have nano reason
+ msg->state = RMR_ERR_RETRY;
} else {
+ rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
msg->state = RMR_ERR_SENDFAILED;
}