X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=0a4f80644f301430716e572f1c151d31395559c7;hb=6d112571b27574ae857da7cb8dc8758ffee4ff60;hp=fd8dcb0fe33c33feb364b6ebb05844ed82f2e184;hpb=02e8d49f42d94b51494977aa0d6f67479f1ceb1c;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index fd8dcb0..0a4f806 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -56,16 +56,18 @@ #include "si95/socket_if.h" #include "si95/siproto.h" +#define SI95_BUILD 1 // we drop some common functions for si #include "rmr.h" // things the users see #include "rmr_agnostic.h" // agnostic things (must be included before private) -#include "rmr_si_private.h" // things that we need too +#include "rmr_si_private.h" // things that we need too #include "rmr_symtab.h" +#include "rmr_logging.h" #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific #include "rtable_si_static.c" // route table things -- transport specific -#include "rtc_si_static.c" // specific RMR only route table collector (SI only for now) +#include "rtc_static.c" // route table collector (thread code) #include "tools_static.c" #include "sr_si_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) @@ -183,7 +185,10 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full if( mbuf->tp_buf ) { free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) free( mbuf ); } } @@ -271,7 +276,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = 0; // at this point any bad state is in msg returned if( msg->header == NULL ) { - fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; msg->tp_state = errno; return msg; @@ -285,7 +290,8 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { */ if( (nn_sock = msg->rts_fd) < 0 ) { if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx ); + //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx ); + sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; @@ -293,7 +299,6 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { } } - msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip @@ -447,42 +452,42 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, if( exp_len > RMR_MAX_XID ) { exp_len = RMR_MAX_XID; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect ); while( queued < allow2queue ) { msg = rcv_msg( ctx, msg ); // hard wait for next if( msg->state == RMR_OK ) { if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it - if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); return msg; } if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); errno = ENOBUFS; return NULL; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); queued++; msg = NULL; } } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect ); errno = ETIMEDOUT; return NULL; } /* Set send timeout. The value time is assumed to be milliseconds. The timeout is the - _rough_ maximum amount of time that RMr will block on a send attempt when the underlying + _rough_ maximum amount of time that RMR will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning, but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) after every 1K send attempts until the "time" value is reached. Retries are abandoned - if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT. + if NNG returns anything other than EAGAIN or EINTER is returned. The default, if this function is not used, is 1; meaning that RMr will retry, but will not enter a sleep. In all cases the caller should check the status in the message returned @@ -511,7 +516,7 @@ extern int rmr_set_stimeout( void* vctx, int time ) { CAUTION: this is not supported as they must be set differently (between create and open) in NNG. */ extern int rmr_set_rtimeout( void* vctx, int time ) { - fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" ); + rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" ); return 0; } @@ -522,11 +527,15 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { invokes this. Internal functions (the route table collector) which need additional open ports without starting additional route table collectors, will invoke this directly with the proper flag. + + CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths + that we know about. The _user_ should ensure that the supplied length also + includes the trace data length maximum as they are in control of that. */ static void* init( char* uproto_port, int max_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; - char bind_info[NNG_MAXADDRLEN]; // bind info + char bind_info[256]; // bind info char* proto = "tcp"; // pointer into the proto/port string user supplied char* port; char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) @@ -537,12 +546,17 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port int state; int i; + int old_vlevel; + + old_vlevel = rmr_vlog_init(); // initialise and get the current level + rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } + rmr_set_vlevel( old_vlevel ); // return logging to the desired state errno = 0; if( uproto_port == NULL ) { @@ -557,7 +571,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } memset( ctx, 0, sizeof( uta_ctx_t ) ); - if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); ctx->nrivers = 256; // number of input flows we'll manage ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers ); memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers ); @@ -567,12 +581,21 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->d1_len = 4; // data1 space in header -- 4 bytes for now - ctx->max_ibm = max_msg_size; // default to user supplied message size + ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt + ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si + ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls + + if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on + uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock + uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + } else { + rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); + } init_mtcall( ctx ); // set up call chutes + fd2ep_init( ctx ); // initialise the fd to endpoint sym tab - ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { @@ -584,7 +607,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff if( ctx->si_ctx == NULL ) { - fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" ); + rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" ); free_ctx( ctx ); return NULL; } @@ -621,7 +644,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { free( tok ); } else { if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); return NULL; } if( (tok = strchr( wbuf, '.' )) != NULL ) { @@ -631,7 +654,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port - fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); return NULL; } @@ -647,11 +670,11 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } else { ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src if( ctx->my_ip == NULL ) { - fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" ); + rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer } } - if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip ); if( (tok = getenv( ENV_WARNINGS )) != NULL ) { if( *tok == '1' ) { @@ -666,26 +689,28 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) { - fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); free_ctx( ctx ); return NULL; } - if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC + if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself) + ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used + } else { if( static_rtc ) { if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader - fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); } } else { if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread - fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); } } } ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it - fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); } free( proto_port ); @@ -766,7 +791,7 @@ extern int rmr_get_rcvfd( void* vctx ) { /* if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); + rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) ); return -1; } */ @@ -886,7 +911,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { errno = 0; // interrupted call state could be left; clear - if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued mbuf->state = RMR_OK;