This provides an external path to the realloc static function as it's called by an
outward facing mbuf api function. Used to reallocate a message with a different
trace data size.
+
+ User programmes must use this with CAUTION! The mbuf passed in is NOT freed and
+ is still valid following this call. The caller is reponsible for maintainting
+ a pointer to both old and new messages and invoking rmr_free_msg() on both!
*/
extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
return realloc_msg( msg, new_tr_size );
*/
extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
if( mbuf == NULL ) {
+ fprintf( stderr, ">>>FREE nil buffer\n" );
return;
}
+#ifdef KEEP
if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
! mbuf->ring || // cant cache if no ring
! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
+#else
+ // always free, never manage a pool
+ if( mbuf->tp_buf ) {
+ free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
+ }
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
+ free( mbuf );
+#endif
}
/*
that we know about. The _user_ should ensure that the supplied length also
includes the trace data length maximum as they are in control of that.
*/
-static void* init( char* uproto_port, int def_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
static int announced = 0;
uta_ctx_t* ctx = NULL;
char bind_info[256]; // bind info
if( ! announced ) {
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
- RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
proto_port = strdup( DEF_COMM_PORT );
+ rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
} else {
proto_port = strdup( uproto_port ); // so we can modify it
}
memset( ctx, 0, sizeof( uta_ctx_t ) );
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
- ctx->nrivers = 256; // number of input flows we'll manage
+ ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max
ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+ ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed
memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
for( i = 0; i < ctx->nrivers; i++ ) {
ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
+ uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast
} else {
rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
}
} else {
port = proto_port; // assume something like "1234" was passed
}
+ rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
static_rtc = 1;
}
- if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
+ if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all
interface = "0.0.0.0";
}
- snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
+ snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
goto err;