Allow RTS calls prior to initial route table load
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 4aefe4a..7a2bc96 100644 (file)
@@ -419,6 +419,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 }
 
 /*
 }
 
 /*
+       DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+               too.  This function likely will not behave as expected in SI, and we are pretty sure it
+               isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
@@ -455,22 +459,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
-               msg = rcv_msg( ctx, msg );                                      // hard wait for next
-               if( msg->state == RMR_OK ) {
-                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
-                               return msg;
-                       }
-
-                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
-                               errno = ENOBUFS;
-                               return NULL;
+               msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
+               if( msg != NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
+                       if( msg->state == RMR_OK ) {
+                               if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                                       return msg;
+                               }
+
+                               if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+                                       errno = ENOBUFS;
+                                       return NULL;
+                               }
+
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+                               queued++;
+                               msg = NULL;
                        }
                        }
-
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
-                       queued++;
-                       msg = NULL;
                }
        }
 
                }
        }
 
@@ -623,7 +630,7 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 1 ) {       // must check here -- if < 1 then we just start static file 'listener'
+       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
                static_rtc = 1;
        }
 
                static_rtc = 1;
        }
 
@@ -695,9 +702,12 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
-       if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
-               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+       ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // create an empty route table so that wormhole/rts calls can be used
+       if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
+               ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
        } else {
        } else {
+               ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
+
                if( static_rtc ) {
                        rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                if( static_rtc ) {
                        rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
@@ -770,11 +780,7 @@ extern int rmr_ready( void* vctx ) {
                return FALSE;
        }
 
                return FALSE;
        }
 
-       if( ctx->rtable != NULL ) {
-               return TRUE;
-       }
-
-       return FALSE;
+       return ctx->rtable_ready;
 }
 
 /*
 }
 
 /*
@@ -860,6 +866,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
+                       sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
@@ -1079,8 +1087,10 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
 
        // must vet call_id here, all others vetted by workhorse mt_call() function
        if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
 
        // must vet call_id here, all others vetted by workhorse mt_call() function
        if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
-               mbuf->state = RMR_ERR_BADARG;
-               mbuf->tp_state = EINVAL;
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = EINVAL;
+               }
                return mbuf;
        }
 
                return mbuf;
        }