Allow RTS calls prior to initial route table load
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 4aefe4a..7a2bc96 100644 (file)
@@ -419,6 +419,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 }
 
 /*
+       DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+               too.  This function likely will not behave as expected in SI, and we are pretty sure it
+               isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
@@ -455,22 +459,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
-               msg = rcv_msg( ctx, msg );                                      // hard wait for next
-               if( msg->state == RMR_OK ) {
-                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
-                               return msg;
-                       }
-
-                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
-                               errno = ENOBUFS;
-                               return NULL;
+               msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
+               if( msg != NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
+                       if( msg->state == RMR_OK ) {
+                               if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                                       return msg;
+                               }
+
+                               if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+                                       errno = ENOBUFS;
+                                       return NULL;
+                               }
+
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+                               queued++;
+                               msg = NULL;
                        }
-
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
-                       queued++;
-                       msg = NULL;
                }
        }
 
@@ -623,7 +630,7 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 1 ) {       // must check here -- if < 1 then we just start static file 'listener'
+       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
                static_rtc = 1;
        }
 
@@ -695,9 +702,12 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
-       if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
-               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+       ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // create an empty route table so that wormhole/rts calls can be used
+       if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
+               ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
        } else {
+               ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
+
                if( static_rtc ) {
                        rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
@@ -770,11 +780,7 @@ extern int rmr_ready( void* vctx ) {
                return FALSE;
        }
 
-       if( ctx->rtable != NULL ) {
-               return TRUE;
-       }
-
-       return FALSE;
+       return ctx->rtable_ready;
 }
 
 /*
@@ -860,6 +866,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
+                       sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
@@ -1079,8 +1087,10 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
 
        // must vet call_id here, all others vetted by workhorse mt_call() function
        if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
-               mbuf->state = RMR_ERR_BADARG;
-               mbuf->tp_state = EINVAL;
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = EINVAL;
+               }
                return mbuf;
        }