X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=c8c3b4aa384dcc35a9733e8383ab8b454cb65b1c;hb=c1f84f8a4a4e2b90ad9ec18aba2b5365d3e51386;hp=b1bfd1fb27f3ff56dc2bb6463bc307ab6ff74451;hpb=ec88d3c0563eeb6ae5f73427edb0b3c4d7acf299;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index b1bfd1f..c8c3b4a 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -65,8 +65,7 @@ #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific #include "rtable_si_static.c" // route table things -- transport specific -#include "rtc_static.c" // route table collector -#include "rtc_si_static.c" // our private test function +#include "rtc_si_static.c" // specific RMR only route table collector (SI only for now) #include "tools_static.c" #include "sr_si_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) @@ -527,7 +526,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { static void* init( char* uproto_port, int max_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; - char bind_info[NNG_MAXADDRLEN]; // bind info + char bind_info[256]; // bind info char* proto = "tcp"; // pointer into the proto/port string user supplied char* port; char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) @@ -535,11 +534,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer char* tok2; + int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port int state; int i; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -570,9 +570,16 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->max_ibm = max_msg_size; // default to user supplied message size ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si + ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls + + if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on + uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock + uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + } else { + fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" ); + } init_mtcall( ctx ); // set up call chutes - ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { @@ -600,6 +607,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } + if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener' + if( atoi( tok ) < 1 ) { + static_rtc = 1; + } + } + if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system tok = strdup( tok ); // something we can destroy if( *tok == '[' ) { // we allow an ipv6 address here @@ -665,13 +678,18 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { return NULL; } - if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc - if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread - fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC + if( static_rtc ) { + if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader + fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); + } + } else { + if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread + fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); + } } } - //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx ); ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); @@ -1019,6 +1037,34 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m return mbuf; } +/* + Given an existing message buffer, reallocate the payload portion to + be at least new_len bytes. The message header will remain such that + the caller may use the rmr_rts_msg() function to return a payload + to the sender. + + The mbuf passed in may or may not be reallocated and the caller must + use the returned pointer and should NOT assume that it can use the + pointer passed in with the exceptions based on the clone flag. + + If the clone flag is set, then a duplicated message, with larger payload + size, is allocated and returned. The old_msg pointer in this situation is + still valid and must be explicitly freed by the application. If the clone + message is not set (0), then any memory management of the old message is + handled by the function. + + If the copy flag is set, the contents of the old message's payload is + copied to the reallocated payload. If the flag is not set, then the + contents of the payload is undetermined. +*/ +extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) { + if( old_msg == NULL ) { + return NULL; + } + + return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough +} + /* Enable low latency things in the transport (when supported). */