X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=54a1bba883ff1af28b0eca702de16bfa83158f93;hb=8c6756d9d6f94beca0bc382f97383ca5e79d16c7;hp=8e499c1cd730d9feeb5e40504c55c95bbf6d1ff3;hpb=5200efe1e6dd13b1e1241ce623c4978151be34e8;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 8e499c1..54a1bba 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -1,8 +1,8 @@ // vim: ts=4 sw=4 noet : /* ================================================================================== - Copyright (c) 2019-2020 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2019-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -57,15 +57,20 @@ #include "si95/siproto.h" +#define SI95_BUILD 1 // we drop some common functions for si + #include "rmr.h" // things the users see #include "rmr_agnostic.h" // agnostic things (must be included before private) -#include "rmr_si_private.h" // things that we need too +#include "rmr_si_private.h" // things that we need too + #include "rmr_symtab.h" +#include "rmr_logging.h" #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific #include "rtable_si_static.c" // route table things -- transport specific -#include "rtc_si_static.c" // specific RMR only route table collector (SI only for now) +#include "alarm.c" +#include "rtc_static.c" // route table collector (thread code) #include "tools_static.c" #include "sr_si_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) @@ -75,15 +80,59 @@ //------------------------------------------------------------------------------ +/* + If we have an EP, up the counters based on state. + This isn't needed, but it makes driving the code under unit test easier so we + induldge in the bean counter's desire for coverage numbers. +*/ +static inline void incr_ep_counts( int state, endpoint_t* ep ) { + if( ep != NULL ) { + switch( state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } +} /* Clean up a context. */ static void free_ctx( uta_ctx_t* ctx ) { if( ctx ) { - if( ctx->rtg_addr ) { + if( ctx->rtg_addr ){ free( ctx->rtg_addr ); } + uta_ring_free( ctx->mring ); + uta_ring_free( ctx->zcb_mring ); + if( ctx->chutes ){ + free( ctx->chutes ); + } + if( ctx->fd2ep ){ + rmr_sym_free( ctx->fd2ep ); + } + if( ctx->my_name ){ + free( ctx->my_name ); + } + if( ctx->my_ip ){ + free( ctx->my_ip ); + } + if( ctx->rtable ){ + rmr_sym_free( ctx->rtable->hash ); + free( ctx->rtable ); + } + if ( ctx->ephash ){ + free( ctx->ephash ); + } + free( ctx ); } } @@ -100,8 +149,8 @@ static void free_ctx( uta_ctx_t* ctx ) { The allocated len stored in the msg is: transport header length + - message header + - user requested payload + message header + + user requested payload The msg header is a combination of the fixed RMR header and the variable trace data and d2 fields which may vary for each message. @@ -163,6 +212,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned This provides an external path to the realloc static function as it's called by an outward facing mbuf api function. Used to reallocate a message with a different trace data size. + + User programmes must use this with CAUTION! The mbuf passed in is NOT freed and + is still valid following this call. The caller is reponsible for maintainting + a pointer to both old and new messages and invoking rmr_free_msg() on both! */ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { return realloc_msg( msg, new_tr_size ); @@ -173,19 +226,34 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { Return the message to the available pool, or free it outright. */ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { - //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf ); - //return; - if( mbuf == NULL ) { + fprintf( stderr, ">>>FREE nil buffer\n" ); return; } - if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full +#ifdef KEEP + if( mbuf->flags & MFL_HUGE || // don't cache oversized messages + ! mbuf->ring || // cant cache if no ring + ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full + if( mbuf->tp_buf ) { free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) free( mbuf ); } +#else + // always free, never manage a pool + if( mbuf->tp_buf ) { + free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE + } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) + free( mbuf ); +#endif } /* @@ -199,8 +267,8 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off d1 = DATA1_ADDR( msg->header ); - d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end - } + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + } return mtosend_msg( vctx, msg, max_to ); } @@ -218,7 +286,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { d1 = DATA1_ADDR( msg->header ); d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end - } + } return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx } @@ -226,16 +294,20 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { /* Return to sender allows a message to be sent back to the endpoint where it originated. - In the SI world the file descriptor that was the source of the message is captured in - the mbuffer and thus can be used to quickly find the target for an RTS call. + With SI95 it was thought that the return to sender would be along the same open conneciton + and thus no table lookup would be needed to open a 'reverse direction' path. However, for + applications sending at high message rates, returning responses on the same connection + causes major strife. Thus the decision was made to use the same method as NNG and just + open a second connection for reverse path. + + We will attempt to use the name in the received message to look up the endpoint. If + that failes, then we will write on the connection that the message arrived on as a + falback. - The source information in the message is used to select the socket on which to write - the message rather than using the message type and round-robin selection. This - should return a message buffer with the state of the send operation set. On success - (state is RMR_OK, the caller may use the buffer for another receive operation), and on - error it can be passed back to this function to retry the send if desired. On error, - errno will liklely have the failure reason set by the nng send processing. - The following are possible values for the state in the message buffer: + On success (state is RMR_OK, the caller may use the buffer for another receive operation), + and on error it can be passed back to this function to retry the send if desired. On error, + errno will liklely have the failure reason set by the nng send processing. The following + are possible values for the state in the message buffer: Message states returned: RMR_ERR_BADARG - argument (context or msg) was nil or invalid @@ -248,13 +320,12 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { failure. The value of errno might give a clue as to what is wrong. CAUTION: - Like send_msg(), this is non-blocking and will return the msg if there is an errror. + Like send_msg(), this is non-blocking and will return the msg if there is an error. The caller must check for this and handle it properly. */ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { int nn_sock; // endpoint socket for send uta_ctx_t* ctx; - int state; char* hold_src; // we need the original source if send fails char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket @@ -271,7 +342,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { errno = 0; // at this point any bad state is in msg returned if( msg->header == NULL ) { - fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" ); msg->state = RMR_ERR_NOHDR; msg->tp_state = errno; return msg; @@ -279,45 +350,29 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off -/* - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx ); // src is always used first for rts + sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first if( ! sock_ok ) { -*/ - if( (nn_sock = msg->rts_fd) < 0 ) { - if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx ); - } - if( ! sock_ok ) { - msg->state = RMR_ERR_NOENDPT; - return msg; // preallocated msg can be reused since not given back to nn + if( (nn_sock = msg->rts_fd) < 0 ) { + if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known + sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); + } + if( ! sock_ok ) { + msg->state = RMR_ERR_NOENDPT; + return msg; + } } } - msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { - if( ep != NULL ) { - switch( msg->state ) { - case RMR_OK: - ep->scounts[EPSC_GOOD]++; - break; - - case RMR_ERR_RETRY: - ep->scounts[EPSC_TRANS]++; - break; - - default: - // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed - ep->scounts[EPSC_FAIL]++; - break; - } - } - strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again - strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again + incr_ep_counts( msg->state, ep ); // update counts + + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again + zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } @@ -330,7 +385,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { If multi-threading call is turned on, this invokes that mechanism with the special call id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original behavour (described below) is carried out. This is safe to use when mt is enabled, but - the user app is invoking rmr_call() from only one thread, and the caller doesn't need + the user app is invoking rmr_call() from only one thread, and the caller doesn't need a flexible timeout. On timeout this function will return a nil pointer. If the original message could not @@ -365,7 +420,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { return msg; } - return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec + return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec } /* @@ -414,6 +469,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { } /* + DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will + too. This function likely will not behave as expected in SI, and we are pretty sure it + isn't being used as there was an abort triggering reference to rmr_rcv() until now. + This blocks until the message with the 'expect' ID is received. Messages which are received before the expected message are queued onto the message ring. The function will return a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the @@ -447,42 +506,45 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, if( exp_len > RMR_MAX_XID ) { exp_len = RMR_MAX_XID; } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect ); while( queued < allow2queue ) { - msg = rcv_msg( ctx, msg ); // hard wait for next - if( msg->state == RMR_OK ) { - if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it - if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); - return msg; - } - - if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" ); - errno = ENOBUFS; - return NULL; + msg = rmr_rcv_msg( ctx, msg ); // hard wait for next + if( msg != NULL ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state ); + if( msg->state == RMR_OK ) { + if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + return msg; + } + + if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); + errno = ENOBUFS; + return NULL; + } + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); + queued++; + msg = NULL; } - - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype ); - queued++; - msg = NULL; } } - if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect ); errno = ETIMEDOUT; return NULL; } /* Set send timeout. The value time is assumed to be milliseconds. The timeout is the - _rough_ maximum amount of time that RMr will block on a send attempt when the underlying + _rough_ maximum amount of time that RMR will block on a send attempt when the underlying mechnism indicates eagain or etimeedout. All other error conditions are reported without this delay. Setting a timeout of 0 causes no retries to be attempted in RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning, but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us) after every 1K send attempts until the "time" value is reached. Retries are abandoned - if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT. + if NNG returns anything other than EAGAIN or EINTER is returned. The default, if this function is not used, is 1; meaning that RMr will retry, but will not enter a sleep. In all cases the caller should check the status in the message returned @@ -511,10 +573,39 @@ extern int rmr_set_stimeout( void* vctx, int time ) { CAUTION: this is not supported as they must be set differently (between create and open) in NNG. */ extern int rmr_set_rtimeout( void* vctx, int time ) { - fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" ); + rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" ); return 0; } +/* + Common cleanup on initialisation error. These are hard to force, and this helps to ensure + all code is tested by providing a callable rather than a block of "goto" code. + + There is a return value so that where we need this we get dinked only for one + uncovered line rather than two: + init_err(...); + return NULL; + + That's a hack, and is yet another example of the testing tail wagging the dog. +*/ +static inline void* init_err( char* msg, void* ctx, void* port, int errval ) { + if( errval != 0 ) { // if not letting it be what a sysllib set it to... + errno = errval; + } + + if( port ) { // free things if allocated + free( port ); + } + if( ctx ) { + free_ctx( ctx ); + } + + if( msg ) { // crit message if supplied + rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) ); + } + + return NULL; +} /* This is the actual init workhorse. The user visible function meerly ensures that the @@ -522,13 +613,17 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { invokes this. Internal functions (the route table collector) which need additional open ports without starting additional route table collectors, will invoke this directly with the proper flag. + + CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths + that we know about. The _user_ should ensure that the supplied length also + includes the trace data length maximum as they are in control of that. */ -static void* init( char* uproto_port, int max_msg_size, int flags ) { +static void* init( char* uproto_port, int def_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; - char bind_info[NNG_MAXADDRLEN]; // bind info + char bind_info[256]; // bind info char* proto = "tcp"; // pointer into the proto/port string user supplied - char* port; + char* port; // pointer into the proto_port buffer at the port value char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined) char* proto_port; char wbuf[1024]; // work buffer @@ -537,29 +632,42 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port int state; int i; + int old_vlevel; + + old_vlevel = rmr_vlog_init(); // initialise and get the current level if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n", + uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; + + rmr_set_vlevel( old_vlevel ); // return logging to the desired state + uta_dump_env(); // spit out environment settings meaningful to us if in info mode } errno = 0; if( uproto_port == NULL ) { proto_port = strdup( DEF_COMM_PORT ); + rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port ); } else { proto_port = strdup( uproto_port ); // so we can modify it } + if ( proto_port == NULL ){ + return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM ); + } + if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) { - errno = ENOMEM; - return NULL; + return init_err( "unable to allocate context", ctx, proto_port, ENOMEM ); } memset( ctx, 0, sizeof( uta_ctx_t ) ); - if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" ); - ctx->nrivers = 256; // number of input flows we'll manage + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); + ctx->snarf_rt_fd = -1; + ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers ); + ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers ); for( i = 0; i < ctx->nrivers; i++ ) { ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet @@ -567,26 +675,31 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->d1_len = 4; // data1 space in header -- 4 bytes for now - ctx->max_ibm = max_msg_size; // default to user supplied message size + ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt + ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si + ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls + + if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on + uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock + uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast + } else { + rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); + } init_mtcall( ctx ); // set up call chutes + fd2ep_init( ctx ); // initialise the fd to endpoint sym tab - ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh - if( max_msg_size > 0 ) { - ctx->max_plen = max_msg_size; + if( def_msg_size > 0 ) { + ctx->max_plen = def_msg_size; } - // we're using a listener to get rtg updates, so we do NOT need this. - //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors - ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff if( ctx->si_ctx == NULL ) { - fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" ); - free_ctx( ctx ); - return NULL; + return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 ); } if( (port = strchr( proto_port, ':' )) != NULL ) { @@ -599,11 +712,10 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } else { port = proto_port; // assume something like "1234" was passed } + rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port ); - if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener' - if( atoi( tok ) < 1 ) { - static_rtc = 1; - } + if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener' + static_rtc = 1; } if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system @@ -621,8 +733,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { free( tok ); } else { if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - return NULL; + return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 ); } if( (tok = strchr( wbuf, '.' )) != NULL ) { *tok = 0; // we don't keep domain portion @@ -631,8 +742,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port - fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); - return NULL; + return init_err( "hostname + port is too long", ctx, proto_port, EINVAL ); } if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) { @@ -647,45 +757,65 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { } else { ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src if( ctx->my_ip == NULL ) { - fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" ); - strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer + rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); + ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer } } - if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip ); if( (tok = getenv( ENV_WARNINGS )) != NULL ) { if( *tok == '1' ) { - ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance) + ctx->flags |= CFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance) } } - if( (interface = getenv( ENV_BIND_IF )) == NULL ) { + if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all interface = "0.0.0.0"; } - - snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default + + snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) { - fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); - free_ctx( ctx ); - return NULL; + rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) ); + return init_err( NULL, ctx, proto_port, 0 ); + } + + // finish all flag setting before threads to keep helgrind quiet + ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way + + + // ---------------- setup for route table collector before invoking ---------------------------------- + ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables + if( ctx->rtgate != NULL ) { + pthread_mutex_init( ctx->rtgate, NULL ); + } + + ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table + if( ctx->ephash == NULL ) { + return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM ); } - if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC + ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used + if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases + ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case + } else { + ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread + if( static_rtc ) { + rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port ); if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader - fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); } } else { + rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port ); if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread - fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); } } } - ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it - fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); } free( proto_port ); @@ -708,8 +838,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { without drastically changing anything. The user should invoke with RMRFL_NONE to avoid any misbehavour as there are internal flags which are suported */ -extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { - return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off +extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) { + return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off } /* @@ -743,11 +873,7 @@ extern int rmr_ready( void* vctx ) { return FALSE; } - if( ctx->rtable != NULL ) { - return TRUE; - } - - return FALSE; + return ctx->rtable_ready; } /* @@ -764,13 +890,6 @@ extern int rmr_get_rcvfd( void* vctx ) { return -1; } -/* - if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); - return -1; - } -*/ - return uta_ring_getpfd( ctx->mring ); } @@ -790,6 +909,10 @@ extern void rmr_close( void* vctx ) { return; } + if( ctx->seed_rt_fname != NULL ) { + free( ctx->seed_rt_fname ); + } + ctx->shutdown = 1; SItp_stats( ctx->si_ctx ); // dump some interesting stats @@ -810,7 +933,6 @@ extern void rmr_close( void* vctx ) { */ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { uta_ctx_t* ctx; - uta_mhdr_t* hdr; // header in the transport buffer chute_t* chute; struct timespec ts; // time info if we have a timeout long new_ms; // adjusted mu-sec @@ -818,7 +940,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { long nano_sec; // max wait xlated to nano seconds int state; rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here - + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { errno = EINVAL; if( mbuf ) { @@ -834,9 +956,11 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW! if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued + clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time + sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop) if( ombuf ) { rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now - } + } } else { mbuf = ombuf; // return original if it was given with timeout status if( ombuf != NULL ) { @@ -845,6 +969,10 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { } } + if( mbuf != NULL ) { + mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src + } + return mbuf; } @@ -886,9 +1014,10 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { errno = 0; // interrupted call state could be left; clear - if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued mbuf->state = RMR_OK; + mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src if( ombuf ) { rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring @@ -905,23 +1034,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { return mbuf; } -/* - Accept a message buffer and caller ID, send the message and then wait - for the receiver to tickle the semaphore letting us know that a message - has been received. The call_id is a value between 2 and 255, inclusive; if - it's not in this range an error will be returned. Max wait is the amount - of time in millaseconds that the call should block for. If 0 is given - then no timeout is set. - If the mt_call feature has not been initialised, then the attempt to use this - funciton will fail with RMR_ERR_NOTSUPP - If no matching message is received before the max_wait period expires, a - nil pointer is returned, and errno is set to ETIMEOUT. If any other error - occurs after the message has been sent, then a nil pointer is returned - with errno set to some other value. + +/* + This is the work horse for the multi-threaded call() function. It supports + both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description + for for rmr_mt_call() modulo the caveat below. + + If endpoint is given, then we assume that we're not doing normal route table + routing and that we should send directly to that endpoint (probably worm + hole). */ -extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { +static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) { rmr_mbuf_t* ombuf; // original mbuf passed in uta_ctx_t* ctx; uta_mhdr_t* hdr; // header in the transport buffer @@ -932,7 +1057,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m long seconds = 0; // max wait seconds long nano_sec; // max wait xlated to nano seconds int state; - + errno = EINVAL; if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) { if( mbuf ) { @@ -948,12 +1073,6 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m return mbuf; } - if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them - mbuf->state = RMR_ERR_BADARG; - mbuf->tp_state = errno; - return mbuf; - } - ombuf = mbuf; // save to return timeout status with chute = &ctx->chutes[call_id]; @@ -961,7 +1080,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m rmr_free_msg( chute->mbuf ); chute->mbuf = NULL; } - + hdr = (uta_mhdr_t *) mbuf->header; hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for @@ -970,7 +1089,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend if( max_wait >= 0 ) { - clock_gettime( CLOCK_REALTIME, &ts ); + clock_gettime( CLOCK_REALTIME, &ts ); if( max_wait > 999 ) { seconds = max_wait / 1000; @@ -989,7 +1108,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m seconds = 1; // use as flag later to invoked timed wait } - mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + if( ep == NULL ) { // normal routing + mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + } else { + mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 ); + } if( mbuf ) { if( mbuf->state != RMR_OK ) { mbuf->tp_state = errno; @@ -1024,12 +1147,75 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m } mbuf = chute->mbuf; - mbuf->state = RMR_OK; + if( mbuf != NULL ) { + mbuf->state = RMR_OK; + } chute->mbuf = NULL; return mbuf; } +/* + Accept a message buffer and caller ID, send the message and then wait + for the receiver to tickle the semaphore letting us know that a message + has been received. The call_id is a value between 2 and 255, inclusive; if + it's not in this range an error will be returned. Max wait is the amount + of time in millaseconds that the call should block for. If 0 is given + then no timeout is set. + + If the mt_call feature has not been initialised, then the attempt to use this + funciton will fail with RMR_ERR_NOTSUPP + + If no matching message is received before the max_wait period expires, a + nil pointer is returned, and errno is set to ETIMEOUT. If any other error + occurs after the message has been sent, then a nil pointer is returned + with errno set to some other value. + + This is now just an outward facing wrapper so we can support wormhole calls. +*/ +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + + // must vet call_id here, all others vetted by workhorse mt_call() function + if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them + if( mbuf != NULL ) { + mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = EINVAL; + } + return mbuf; + } + + return mt_call( vctx, mbuf, call_id, max_wait, NULL ); +} + + +/* + Given an existing message buffer, reallocate the payload portion to + be at least new_len bytes. The message header will remain such that + the caller may use the rmr_rts_msg() function to return a payload + to the sender. + + The mbuf passed in may or may not be reallocated and the caller must + use the returned pointer and should NOT assume that it can use the + pointer passed in with the exceptions based on the clone flag. + + If the clone flag is set, then a duplicated message, with larger payload + size, is allocated and returned. The old_msg pointer in this situation is + still valid and must be explicitly freed by the application. If the clone + message is not set (0), then any memory management of the old message is + handled by the function. + + If the copy flag is set, the contents of the old message's payload is + copied to the reallocated payload. If the flag is not set, then the + contents of the payload is undetermined. +*/ +extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) { + if( old_msg == NULL ) { + return NULL; + } + + return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough +} + /* Enable low latency things in the transport (when supported). */