X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=b8bb2a0567d83565e617005a5e2485b0069e8d22;hb=b0a8a9c1d7bb07d2708a8c1a7b580592756d012e;hp=80d7408c05d0d259055de5f8512358cf825a8f3f;hpb=ce1c741c01e8387cb095dac5e36a4d8ad91d006d;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 80d7408..b8bb2a0 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -67,6 +67,7 @@ #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific #include "rtable_si_static.c" // route table things -- transport specific +#include "alarm.c" #include "rtc_static.c" // route table collector (thread code) #include "tools_static.c" #include "sr_si_static.c" // send/receive static functions @@ -187,6 +188,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned This provides an external path to the realloc static function as it's called by an outward facing mbuf api function. Used to reallocate a message with a different trace data size. + + User programmes must use this with CAUTION! The mbuf passed in is NOT freed and + is still valid following this call. The caller is reponsible for maintainting + a pointer to both old and new messages and invoking rmr_free_msg() on both! */ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { return realloc_msg( msg, new_tr_size ); @@ -198,9 +203,11 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { */ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { if( mbuf == NULL ) { + fprintf( stderr, ">>>FREE nil buffer\n" ); return; } +#ifdef KEEP if( mbuf->flags & MFL_HUGE || // don't cache oversized messages ! mbuf->ring || // cant cache if no ring ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full @@ -213,6 +220,16 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) free( mbuf ); } +#else + // always free, never manage a pool + if( mbuf->tp_buf ) { + free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE + } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) + free( mbuf ); +#endif } /* @@ -563,7 +580,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { that we know about. The _user_ should ensure that the supplied length also includes the trace data length maximum as they are in control of that. */ -static void* init( char* uproto_port, int def_msg_size, int flags ) { +static void* init( char* uproto_port, int def_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; char bind_info[256]; // bind info @@ -583,8 +600,8 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { if( ! announced ) { rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version - rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n", + uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; rmr_set_vlevel( old_vlevel ); // return logging to the desired state @@ -594,6 +611,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { errno = 0; if( uproto_port == NULL ) { proto_port = strdup( DEF_COMM_PORT ); + rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port ); } else { proto_port = strdup( uproto_port ); // so we can modify it } @@ -610,8 +628,10 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { memset( ctx, 0, sizeof( uta_ctx_t ) ); if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); - ctx->nrivers = 256; // number of input flows we'll manage + ctx->snarf_rt_fd = -1; + ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers ); + ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers ); for( i = 0; i < ctx->nrivers; i++ ) { ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet @@ -657,6 +677,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { } else { port = proto_port; // assume something like "1234" was passed } + rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port ); if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener' static_rtc = 1; @@ -717,11 +738,11 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { } - if( (interface = getenv( ENV_BIND_IF )) == NULL ) { + if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all interface = "0.0.0.0"; } - snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default + snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) { rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); goto err;