Allow RTS calls prior to initial route table load
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 8606241..7a2bc96 100644 (file)
        Clean up a context.
 */
 static void free_ctx( uta_ctx_t* ctx ) {
-       if( ctx ) {
-               if( ctx->rtg_addr ) {
-                       free( ctx->rtg_addr );
-               }
+       if( ctx && ctx->rtg_addr ) {
+               free( ctx->rtg_addr );
        }
 }
 
@@ -175,14 +173,14 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
-       //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
-       //return;
-
        if( mbuf == NULL ) {
                return;
        }
 
-       if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
+       if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
+               ! mbuf->ring ||                                                                         // cant cache if no ring
+               ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
+
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
                        mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
@@ -263,7 +261,6 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        int                     nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
-       int                     state;
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
@@ -373,7 +370,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
-       return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+    return mt_call( vctx, msg, 1, 1000, NULL );                // use the reserved call-id of 1 and wait up to 1 sec
 }
 
 /*
@@ -422,6 +419,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 }
 
 /*
+       DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+               too.  This function likely will not behave as expected in SI, and we are pretty sure it
+               isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
@@ -458,22 +459,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
-               msg = rcv_msg( ctx, msg );                                      // hard wait for next
-               if( msg->state == RMR_OK ) {
-                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
-                               return msg;
-                       }
-
-                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
-                               errno = ENOBUFS;
-                               return NULL;
+               msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
+               if( msg != NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
+                       if( msg->state == RMR_OK ) {
+                               if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                                       return msg;
+                               }
+
+                               if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+                                       errno = ENOBUFS;
+                                       return NULL;
+                               }
+
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+                               queued++;
+                               msg = NULL;
                        }
-
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
-                       queued++;
-                       msg = NULL;
                }
        }
 
@@ -535,7 +539,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
                                that we know about. The _user_ should ensure that the supplied length also
                                includes the trace data length maximum as they are in control of that.
 */
-static void* init(  char* uproto_port, int max_msg_size, int flags ) {
+static void* init(  char* uproto_port, int def_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
@@ -584,7 +588,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
        ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
@@ -601,8 +605,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
-       if( max_msg_size > 0 ) {
-               ctx->max_plen = max_msg_size;
+       if( def_msg_size > 0 ) {
+               ctx->max_plen = def_msg_size;
        }
 
        // we're using a listener to get rtg updates, so we do NOT need this.
@@ -626,10 +630,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
-               if( atoi( tok ) < 1 ) {
-                       static_rtc = 1;
-               }
+       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
+               static_rtc = 1;
        }
 
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
@@ -674,7 +676,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
-                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
@@ -700,14 +702,19 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
-       if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
-               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+       ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // create an empty route table so that wormhole/rts calls can be used
+       if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
+               ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
        } else {
+               ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
+
                if( static_rtc ) {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
@@ -738,8 +745,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
-extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
-       return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
+extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
+       return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
 /*
@@ -773,11 +780,7 @@ extern int rmr_ready( void* vctx ) {
                return FALSE;
        }
 
-       if( ctx->rtable != NULL ) {
-               return TRUE;
-       }
-
-       return FALSE;
+       return ctx->rtable_ready;
 }
 
 /*
@@ -840,7 +843,6 @@ extern void rmr_close( void* vctx ) {
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        uta_ctx_t*      ctx;
-       uta_mhdr_t*     hdr;                    // header in the transport buffer
        chute_t*        chute;
        struct timespec ts;                     // time info if we have a timeout
        long    new_ms;                         // adjusted mu-sec
@@ -864,6 +866,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
+                       sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
@@ -878,6 +882,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                if( mbuf != NULL ) {
                        mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
                }
+
                return mbuf;
        }
 
@@ -978,12 +983,6 @@ static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_w
                return mbuf;
        }
 
-       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
-               mbuf->state = RMR_ERR_BADARG;
-               mbuf->tp_state = errno;
-               return mbuf;
-       }
-
        ombuf = mbuf;                                                                                                   // save to return timeout status with
 
        chute = &ctx->chutes[call_id];
@@ -1085,6 +1084,16 @@ static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_w
        This is now just an outward facing wrapper so we can support wormhole calls.
 */
 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+
+       // must vet call_id here, all others vetted by workhorse mt_call() function
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = EINVAL;
+               }
+               return mbuf;
+       }
+
        return mt_call( vctx, mbuf, call_id, max_wait, NULL );
 }