}
/*
+ DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+ too. This function likely will not behave as expected in SI, and we are pretty sure it
+ isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
This blocks until the message with the 'expect' ID is received. Messages which are received
before the expected message are queued onto the message ring. The function will return
a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
- msg = rcv_msg( ctx, msg ); // hard wait for next
- if( msg->state == RMR_OK ) {
- if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
- return msg;
- }
-
- if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
- errno = ENOBUFS;
- return NULL;
+ msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
+ if( msg != NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
+ if( msg->state == RMR_OK ) {
+ if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ return msg;
+ }
+
+ if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+ errno = ENOBUFS;
+ return NULL;
+ }
+
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+ queued++;
+ msg = NULL;
}
-
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
- queued++;
- msg = NULL;
}
}
port = proto_port; // assume something like "1234" was passed
}
- if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 1 ) { // must check here -- if < 1 then we just start static file 'listener'
+ if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
static_rtc = 1;
}
// finish all flag setting before threads to keep helgrind quiet
ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
- if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself)
- ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
+ ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used
+ if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases
+ ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case
} else {
+ ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread
+
if( static_rtc ) {
rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
return FALSE;
}
- if( ctx->rtable != NULL ) {
- return TRUE;
- }
-
- return FALSE;
+ return ctx->rtable_ready;
}
/*
if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
+ clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time
+ sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop)
if( ombuf ) {
rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
}
// must vet call_id here, all others vetted by workhorse mt_call() function
if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
- mbuf->state = RMR_ERR_BADARG;
- mbuf->tp_state = EINVAL;
+ if( mbuf != NULL ) {
+ mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = EINVAL;
+ }
return mbuf;
}