X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=17014a60bf9e11df8c17c842209c68fcc3aa745a;hb=0a58458afe875798f0ac3f367bd05e4ba523e115;hp=6be4ffb450402f7fcce103afac8e016dffdc9bd9;hpb=009378503e4621ab53f0839a409563b4a03e100f;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 6be4ffb..17014a6 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -204,7 +204,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off d1 = DATA1_ADDR( msg->header ); - d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end + d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end } return mtosend_msg( vctx, msg, max_to ); @@ -674,7 +674,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src if( ctx->my_ip == NULL ) { rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" ); - strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer + ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer } } if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip ); @@ -700,7 +700,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { // finish all flag setting before threads to keep helgrind quiet ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way - if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself) + if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself) ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used } else { if( static_rtc ) { @@ -875,6 +875,9 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { } } + if( mbuf != NULL ) { + mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src + } return mbuf; } @@ -919,6 +922,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued mbuf->state = RMR_OK; + mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src if( ombuf ) { rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring @@ -935,23 +939,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { return mbuf; } -/* - Accept a message buffer and caller ID, send the message and then wait - for the receiver to tickle the semaphore letting us know that a message - has been received. The call_id is a value between 2 and 255, inclusive; if - it's not in this range an error will be returned. Max wait is the amount - of time in millaseconds that the call should block for. If 0 is given - then no timeout is set. - If the mt_call feature has not been initialised, then the attempt to use this - funciton will fail with RMR_ERR_NOTSUPP - If no matching message is received before the max_wait period expires, a - nil pointer is returned, and errno is set to ETIMEOUT. If any other error - occurs after the message has been sent, then a nil pointer is returned - with errno set to some other value. + +/* + This is the work horse for the multi-threaded call() function. It supports + both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description + for for rmr_mt_call() modulo the caveat below. + + If endpoint is given, then we assume that we're not doing normal route table + routing and that we should send directly to that endpoint (probably worm + hole). */ -extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { +static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) { rmr_mbuf_t* ombuf; // original mbuf passed in uta_ctx_t* ctx; uta_mhdr_t* hdr; // header in the transport buffer @@ -1019,7 +1019,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m seconds = 1; // use as flag later to invoked timed wait } - mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + if( ep == NULL ) { // normal routing + mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success! + } else { + mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 ); + } if( mbuf ) { if( mbuf->state != RMR_OK ) { mbuf->tp_state = errno; @@ -1054,12 +1058,37 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m } mbuf = chute->mbuf; - mbuf->state = RMR_OK; + if( mbuf != NULL ) { + mbuf->state = RMR_OK; + } chute->mbuf = NULL; return mbuf; } +/* + Accept a message buffer and caller ID, send the message and then wait + for the receiver to tickle the semaphore letting us know that a message + has been received. The call_id is a value between 2 and 255, inclusive; if + it's not in this range an error will be returned. Max wait is the amount + of time in millaseconds that the call should block for. If 0 is given + then no timeout is set. + + If the mt_call feature has not been initialised, then the attempt to use this + funciton will fail with RMR_ERR_NOTSUPP + + If no matching message is received before the max_wait period expires, a + nil pointer is returned, and errno is set to ETIMEOUT. If any other error + occurs after the message has been sent, then a nil pointer is returned + with errno set to some other value. + + This is now just an outward facing wrapper so we can support wormhole calls. +*/ +extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + return mt_call( vctx, mbuf, call_id, max_wait, NULL ); +} + + /* Given an existing message buffer, reallocate the payload portion to be at least new_len bytes. The message header will remain such that