X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=8697bc561cbb8dd3f2509e0548a9023832df727d;hb=0d4def6c7b673f3be486338ced65ccdd25a859ed;hp=e50b5d026695991e0a082b27153880f39bf6e5e5;hpb=a1575dacc478b945ea63f5d0cc3db3d66dcb5983;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index e50b5d0..8697bc5 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -59,8 +59,9 @@ #include "rmr.h" // things the users see #include "rmr_agnostic.h" // agnostic things (must be included before private) -#include "rmr_si_private.h" // things that we need too +#include "rmr_si_private.h" // things that we need too #include "rmr_symtab.h" +#include "rmr_logging.h" #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific @@ -271,7 +272,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = 0; // at this point any bad state is in msg returned if( msg->header == NULL ) { - fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; msg->tp_state = errno; return msg; @@ -447,42 +448,42 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, if( exp_len > RMR_MAX_XID ) { exp_len = RMR_MAX_XID; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect ); while( queued < allow2queue ) { msg = rcv_msg( ctx, msg ); // hard wait for next if( msg->state == RMR_OK ) { if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it - if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); return msg; } if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); errno = ENOBUFS; return NULL; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); queued++; msg = NULL; } } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect ); errno = ETIMEDOUT; return NULL; } /* Set send timeout. The value time is assumed to be milliseconds. The timeout is the - _rough_ maximum amount of time that RMr will block on a send attempt when the underlying + _rough_ maximum amount of time that RMR will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning, but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) after every 1K send attempts until the "time" value is reached. Retries are abandoned - if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT. + if NNG returns anything other than EAGAIN or EINTER is returned. The default, if this function is not used, is 1; meaning that RMr will retry, but will not enter a sleep. In all cases the caller should check the status in the message returned @@ -511,7 +512,7 @@ extern int rmr_set_stimeout( void* vctx, int time ) { CAUTION: this is not supported as they must be set differently (between create and open) in NNG. */ extern int rmr_set_rtimeout( void* vctx, int time ) { - fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" ); + rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" ); return 0; } @@ -541,12 +542,17 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port int state; int i; + int old_vlevel; + + old_vlevel = rmr_vlog_init(); // initialise and get the current level + rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } + rmr_set_vlevel( old_vlevel ); // return logging to the desired state errno = 0; if( uproto_port == NULL ) { @@ -561,7 +567,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } memset( ctx, 0, sizeof( uta_ctx_t ) ); - if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); ctx->nrivers = 256; // number of input flows we'll manage ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers ); memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers ); @@ -581,7 +587,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock } else { - fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" ); + rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); } init_mtcall( ctx ); // set up call chutes @@ -596,7 +602,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff if( ctx->si_ctx == NULL ) { - fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" ); + rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" ); free_ctx( ctx ); return NULL; } @@ -633,7 +639,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { free( tok ); } else { if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); return NULL; } if( (tok = strchr( wbuf, '.' )) != NULL ) { @@ -643,7 +649,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port - fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); return NULL; } @@ -659,11 +665,11 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } else { ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src if( ctx->my_ip == NULL ) { - fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" ); + rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer } } - if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip ); if( (tok = getenv( ENV_WARNINGS )) != NULL ) { if( *tok == '1' ) { @@ -678,7 +684,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) { - fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); + rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); free_ctx( ctx ); return NULL; } @@ -686,18 +692,18 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC if( static_rtc ) { if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader - fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); } } else { if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread - fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); } } } ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it - fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); } free( proto_port ); @@ -778,7 +784,7 @@ extern int rmr_get_rcvfd( void* vctx ) { /* if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); + rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) ); return -1; } */ @@ -898,7 +904,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { errno = 0; // interrupted call state could be left; clear - if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued mbuf->state = RMR_OK;