X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=66724e6eaff70cd094be4a162d8480e139ceff87;hb=HEAD;hp=2b96424f90e012102d581f3608cf2682411b820d;hpb=fcea3951d44de0cc55d33c5e114487abe79d3406;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 2b96424..66724e6 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -1,8 +1,8 @@ // vim: ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019-2020 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2019-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -56,34 +56,84 @@ #include "si95/socket_if.h" #include "si95/siproto.h" + #define SI95_BUILD 1 // we drop some common functions for si #include "rmr.h" // things the users see #include "rmr_agnostic.h" // agnostic things (must be included before private) #include "rmr_si_private.h" // things that we need too + #include "rmr_symtab.h" #include "rmr_logging.h" #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific #include "rtable_si_static.c" // route table things -- transport specific +#include "alarm.c" #include "rtc_static.c" // route table collector (thread code) #include "tools_static.c" #include "sr_si_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) #include "mt_call_static.c" #include "mt_call_si_static.c" +#include "rmr_debug_si.c" // debuging functions //------------------------------------------------------------------------------ +/* + If we have an EP, up the counters based on state. + This isn't needed, but it makes driving the code under unit test easier so we + induldge in the bean counter's desire for coverage numbers. +*/ +static inline void incr_ep_counts( int state, endpoint_t* ep ) { + if( ep != NULL ) { + switch( state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } +} /* Clean up a context. */ static void free_ctx( uta_ctx_t* ctx ) { - if( ctx && ctx->rtg_addr ) { - free( ctx->rtg_addr ); + if( ctx ) { + if( ctx->rtg_addr ){ + free( ctx->rtg_addr ); + } + uta_ring_free( ctx->mring ); + uta_ring_free( ctx->zcb_mring ); + if( ctx->chutes ){ + free( ctx->chutes ); + } + if( ctx->fd2ep ){ + rmr_sym_free( ctx->fd2ep ); + } + if( ctx->my_name ){ + free( ctx->my_name ); + } + if( ctx->my_ip ){ + free( ctx->my_ip ); + } + if( ctx->rtable ){ + rmr_sym_free( ctx->rtable->hash ); + free( ctx->rtable ); + } + if ( ctx->ephash ){ + free( ctx->ephash ); + } + free( ctx ); } } @@ -163,6 +213,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned This provides an external path to the realloc static function as it's called by an outward facing mbuf api function. Used to reallocate a message with a different trace data size. + + User programmes must use this with CAUTION! The mbuf passed in is NOT freed and + is still valid following this call. The caller is reponsible for maintainting + a pointer to both old and new messages and invoking rmr_free_msg() on both! */ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { return realloc_msg( msg, new_tr_size ); @@ -174,9 +228,11 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { */ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { if( mbuf == NULL ) { + fprintf( stderr, ">>>FREE nil buffer\n" ); return; } +#ifdef KEEP if( mbuf->flags & MFL_HUGE || // don't cache oversized messages ! mbuf->ring || // cant cache if no ring ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full @@ -189,6 +245,16 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) free( mbuf ); } +#else + // always free, never manage a pool + if( mbuf->tp_buf ) { + free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE + } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) + free( mbuf ); +#endif } /* @@ -298,29 +364,14 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { } } - msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { - if( ep != NULL ) { - switch( msg->state ) { - case RMR_OK: - ep->scounts[EPSC_GOOD]++; - break; - - case RMR_ERR_RETRY: - ep->scounts[EPSC_TRANS]++; - break; - - default: - // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed - ep->scounts[EPSC_FAIL]++; - break; - } - } + incr_ep_counts( msg->state, ep ); // update counts + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source @@ -527,6 +578,35 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { return 0; } +/* + Common cleanup on initialisation error. These are hard to force, and this helps to ensure + all code is tested by providing a callable rather than a block of "goto" code. + + There is a return value so that where we need this we get dinked only for one + uncovered line rather than two: + init_err(...); + return NULL; + + That's a hack, and is yet another example of the testing tail wagging the dog. +*/ +static inline void* init_err( char* msg, void* ctx, void* port, int errval ) { + if( errval != 0 ) { // if not letting it be what a sysllib set it to... + errno = errval; + } + + if( port ) { // free things if allocated + free( port ); + } + if( ctx ) { + free_ctx( ctx ); + } + + if( msg ) { // crit message if supplied + rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) ); + } + + return NULL; +} /* This is the actual init workhorse. The user visible function meerly ensures that the @@ -539,12 +619,13 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { that we know about. The _user_ should ensure that the supplied length also includes the trace data length maximum as they are in control of that. */ -static void* init( char* uproto_port, int def_msg_size, int flags ) { +static void* init( char* uproto_port, int def_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; char bind_info[256]; // bind info + char* my_ip = NULL; char* proto = "tcp"; // pointer into the proto/port string user supplied - char* port; + char* port; // pointer into the proto_port buffer at the port value char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) char* proto_port; char wbuf[1024]; // work buffer @@ -559,8 +640,8 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { if( ! announced ) { rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version - rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n", + uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; rmr_set_vlevel( old_vlevel ); // return logging to the desired state @@ -570,19 +651,25 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { errno = 0; if( uproto_port == NULL ) { proto_port = strdup( DEF_COMM_PORT ); + rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port ); } else { proto_port = strdup( uproto_port ); // so we can modify it } + if ( proto_port == NULL ){ + return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM ); + } + if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) { - errno = ENOMEM; - return NULL; + return init_err( "unable to allocate context", ctx, proto_port, ENOMEM ); } memset( ctx, 0, sizeof( uta_ctx_t ) ); if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); - ctx->nrivers = 256; // number of input flows we'll manage + ctx->snarf_rt_fd = -1; + ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers ); + ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers ); for( i = 0; i < ctx->nrivers; i++ ) { ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet @@ -599,6 +686,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast } else { rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); } @@ -613,10 +701,9 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff if( ctx->si_ctx == NULL ) { - rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" ); - free_ctx( ctx ); - return NULL; + return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 ); } + SIset_tflags(ctx->si_ctx,SI_TF_QUICK); if( (port = strchr( proto_port, ':' )) != NULL ) { if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly @@ -628,6 +715,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { } else { port = proto_port; // assume something like "1234" was passed } + rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port ); if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener' static_rtc = 1; @@ -648,9 +736,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { free( tok ); } else { if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - free_ctx( ctx ); - return NULL; + return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 ); } if( (tok = strchr( wbuf, '.' )) != NULL ) { *tok = 0; // we don't keep domain portion @@ -659,8 +745,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port - rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); - return NULL; + return init_err( "hostname + port is too long", ctx, proto_port, EINVAL ); } if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) { @@ -669,40 +754,68 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { } } - ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons + ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons + my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src + if( my_ip == NULL ) { + rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); + my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer + } + if( flags & RMRFL_NAME_ONLY ) { ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name } else { - ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src - if( ctx->my_ip == NULL ) { - rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); - ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer - } + ctx->my_ip = strdup( my_ip ); } - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip ); if( (tok = getenv( ENV_WARNINGS )) != NULL ) { if( *tok == '1' ) { - ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance) + ctx->flags |= CFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance) } } + if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all (IPv4, IPv6, or interface name) + /* + compares the first ip sussed out by mk_ip_list (returned by get_default_ip) + NOTE: this might be not work very predictable in dual-stack where an interface can have IPv4 and IPv6 addresses assigned, + meaning that it can select either IPv4 or IPv6 on applications restarts (depends on the order of IP addresses assigned on the interface) + */ + if( my_ip[0] == '[' ) { // IPv6 + interface = "[::]"; + } else { // IPv4 + interface = "0.0.0.0"; + if( flags & RMRFL_NAME_ONLY ) { // if hostname is given instead of IP in RMR source address + rmr_vlog( RMR_VL_WARN, "rmr_init: hostname:ip is provided as source information for rts() calls, falling back to any IPv4\n" ); + } + } + } - if( (interface = getenv( ENV_BIND_IF )) == NULL ) { - interface = "0.0.0.0"; + if( my_ip != NULL ) { + free( my_ip ); } - snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default + snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) { rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); - free_ctx( ctx ); - return NULL; + return init_err( NULL, ctx, proto_port, 0 ); } // finish all flag setting before threads to keep helgrind quiet ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way - ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used + + // ---------------- setup for route table collector before invoking ---------------------------------- + ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables + if( ctx->rtgate != NULL ) { + pthread_mutex_init( ctx->rtgate, NULL ); + } + + ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table + if( ctx->ephash == NULL ) { + return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM ); + } + + ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case } else { @@ -816,6 +929,10 @@ extern void rmr_close( void* vctx ) { return; } + if( ctx->seed_rt_fname != NULL ) { + free( ctx->seed_rt_fname ); + } + ctx->shutdown = 1; SItp_stats( ctx->si_ctx ); // dump some interesting stats