X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frmr_si.c;h=7a2bc96d79aaadcc553fbbb8ec5350b1447f533f;hb=cf4413c47ce274d7fc08c3bcfc8c4de3d465ad4d;hp=7d7449f9de30ff0bffc475f2f2dac95f34490656;hpb=2d9d6784b306047e94ca9816813e5007b00fd17e;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 7d7449f..7a2bc96 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -370,7 +370,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) { return msg; } - return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec + return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec } /* @@ -419,6 +419,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { } /* + DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will + too. This function likely will not behave as expected in SI, and we are pretty sure it + isn't being used as there was an abort triggering reference to rmr_rcv() until now. + This blocks until the message with the 'expect' ID is received. Messages which are received before the expected message are queued onto the message ring. The function will return a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the @@ -455,22 +459,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect ); while( queued < allow2queue ) { - msg = rcv_msg( ctx, msg ); // hard wait for next - if( msg->state == RMR_OK ) { - if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued ); - return msg; - } - - if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full - if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); - errno = ENOBUFS; - return NULL; + msg = rmr_rcv_msg( ctx, msg ); // hard wait for next + if( msg != NULL ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state ); + if( msg->state == RMR_OK ) { + if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued ); + return msg; + } + + if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" ); + errno = ENOBUFS; + return NULL; + } + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); + queued++; + msg = NULL; } - - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype ); - queued++; - msg = NULL; } } @@ -623,7 +630,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } - if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 1 ) { // must check here -- if < 1 then we just start static file 'listener' + if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener' static_rtc = 1; } @@ -695,9 +702,12 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { // finish all flag setting before threads to keep helgrind quiet ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way - if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself) - ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used + ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used + if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases + ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case } else { + ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread + if( static_rtc ) { rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port ); if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader @@ -770,11 +780,7 @@ extern int rmr_ready( void* vctx ) { return FALSE; } - if( ctx->rtable != NULL ) { - return TRUE; - } - - return FALSE; + return ctx->rtable_ready; } /* @@ -860,6 +866,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW! if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued + clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time + sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop) if( ombuf ) { rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now } @@ -874,6 +882,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { if( mbuf != NULL ) { mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src } + return mbuf; } @@ -974,12 +983,6 @@ static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_w return mbuf; } - if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them - mbuf->state = RMR_ERR_BADARG; - mbuf->tp_state = errno; - return mbuf; - } - ombuf = mbuf; // save to return timeout status with chute = &ctx->chutes[call_id]; @@ -1081,6 +1084,16 @@ static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_w This is now just an outward facing wrapper so we can support wormhole calls. */ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) { + + // must vet call_id here, all others vetted by workhorse mt_call() function + if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them + if( mbuf != NULL ) { + mbuf->state = RMR_ERR_BADARG; + mbuf->tp_state = EINVAL; + } + return mbuf; + } + return mt_call( vctx, mbuf, call_id, max_wait, NULL ); }