X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=d1b9f26ca660bf7be312d96c1e3c7807197c2212;hb=refs%2Fchanges%2F36%2F3336%2F1;hp=9c67dc37821f109bde93f2aed26f21ed3c6a276c;hpb=99584a241c64d29fc20e74a4b4e01427d0f00e73;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 9c67dc3..d1b9f26 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -82,10 +82,8 @@ Clean up a context. */ static void free_ctx( uta_ctx_t* ctx ) { - if( ctx ) { - if( ctx->rtg_addr ) { - free( ctx->rtg_addr ); - } + if( ctx && ctx->rtg_addr ) { + free( ctx->rtg_addr ); } } @@ -175,14 +173,14 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { Return the message to the available pool, or free it outright. */ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { - //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf ); - //return; - if( mbuf == NULL ) { return; } - if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full + if( mbuf->flags & MFL_HUGE || // don't cache oversized messages + ! mbuf->ring || // cant cache if no ring + ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full + if( mbuf->tp_buf ) { free( mbuf->tp_buf ); mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE @@ -204,7 +202,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off d1 = DATA1_ADDR( msg->header ); - d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end } return mtosend_msg( vctx, msg, max_to ); @@ -263,7 +261,6 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { int nn_sock; // endpoint socket for send uta_ctx_t* ctx; - int state; char* hold_src; // we need the original source if send fails char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket @@ -373,7 +370,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { return msg; } - return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec + return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec } /* @@ -535,7 +532,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { that we know about. The _user_ should ensure that the supplied length also includes the trace data length maximum as they are in control of that. */ -static void* init( char* uproto_port, int max_msg_size, int flags ) { +static void* init( char* uproto_port, int def_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; char bind_info[256]; // bind info @@ -555,7 +552,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { - rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -584,7 +581,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->d1_len = 4; // data1 space in header -- 4 bytes for now - ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt + ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si @@ -601,8 +598,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh - if( max_msg_size > 0 ) { - ctx->max_plen = max_msg_size; + if( def_msg_size > 0 ) { + ctx->max_plen = def_msg_size; } // we're using a listener to get rtg updates, so we do NOT need this. @@ -626,10 +623,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } - if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener' - if( atoi( tok ) < 1 ) { - static_rtc = 1; - } + if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener' + static_rtc = 1; } if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system @@ -674,7 +669,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src if( ctx->my_ip == NULL ) { rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); - strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer + ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer } } if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip ); @@ -697,21 +692,25 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { return NULL; } - if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself) + // finish all flag setting before threads to keep helgrind quiet + ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way + + if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself) ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used } else { if( static_rtc ) { + rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port ); if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); } } else { + rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port ); if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); } } } - ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); } @@ -736,8 +735,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { without drastically changing anything. The user should invoke with RMRFL_NONE to avoid any misbehavour as there are internal flags which are suported */ -extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { - return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off +extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) { + return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off } /* @@ -838,7 +837,6 @@ extern void rmr_close( void* vctx ) { */ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { uta_ctx_t* ctx; - uta_mhdr_t* hdr; // header in the transport buffer chute_t* chute; struct timespec ts; // time info if we have a timeout long new_ms; // adjusted mu-sec @@ -873,6 +871,10 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { } } + if( mbuf != NULL ) { + mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src + } + return mbuf; } @@ -917,6 +919,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued mbuf->state = RMR_OK; + mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src if( ombuf ) { rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring @@ -933,23 +936,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { return mbuf; } -/* - Accept a message buffer and caller ID, send the message and then wait - for the receiver to tickle the semaphore letting us know that a message - has been received. The call_id is a value between 2 and 255, inclusive; if - it's not in this range an error will be returned. Max wait is the amount - of time in millaseconds that the call should block for. If 0 is given - then no timeout is set. - If the mt_call feature has not been initialised, then the attempt to use this - funciton will fail with RMR_ERR_NOTSUPP - If no matching message is received before the max_wait period expires, a - nil pointer is returned, and errno is set to ETIMEOUT. If any other error - occurs after the message has been sent, then a nil pointer is returned - with errno set to some other value. + +/* + This is the work horse for the multi-threaded call() function. It supports + both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description + for for rmr_mt_call() modulo the caveat below. + + If endpoint is given, then we assume that we're not doing normal route table + routing and that we should send directly to that endpoint (probably worm + hole). */ -extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { +static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) { rmr_mbuf_t* ombuf; // original mbuf passed in uta_ctx_t* ctx; uta_mhdr_t* hdr; // header in the transport buffer @@ -976,12 +975,6 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m return mbuf; } - if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them - mbuf->state = RMR_ERR_BADARG; - mbuf->tp_state = errno; - return mbuf; - } - ombuf = mbuf; // save to return timeout status with chute = &ctx->chutes[call_id]; @@ -1017,7 +1010,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m seconds = 1; // use as flag later to invoked timed wait } - mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + if( ep == NULL ) { // normal routing + mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + } else { + mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 ); + } if( mbuf ) { if( mbuf->state != RMR_OK ) { mbuf->tp_state = errno; @@ -1052,12 +1049,45 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m } mbuf = chute->mbuf; - mbuf->state = RMR_OK; + if( mbuf != NULL ) { + mbuf->state = RMR_OK; + } chute->mbuf = NULL; return mbuf; } +/* + Accept a message buffer and caller ID, send the message and then wait + for the receiver to tickle the semaphore letting us know that a message + has been received. The call_id is a value between 2 and 255, inclusive; if + it's not in this range an error will be returned. Max wait is the amount + of time in millaseconds that the call should block for. If 0 is given + then no timeout is set. + + If the mt_call feature has not been initialised, then the attempt to use this + funciton will fail with RMR_ERR_NOTSUPP + + If no matching message is received before the max_wait period expires, a + nil pointer is returned, and errno is set to ETIMEOUT. If any other error + occurs after the message has been sent, then a nil pointer is returned + with errno set to some other value. + + This is now just an outward facing wrapper so we can support wormhole calls. +*/ +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + + // must vet call_id here, all others vetted by workhorse mt_call() function + if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them + mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = EINVAL; + return mbuf; + } + + return mt_call( vctx, mbuf, call_id, max_wait, NULL ); +} + + /* Given an existing message buffer, reallocate the payload portion to be at least new_len bytes. The message header will remain such that