Fix crash under SI95 with multiple receive threads
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 8e499c1..0a4f806 100644 (file)
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
+#define SI95_BUILD     1                       // we drop some common functions for si
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
-#include "rmr_si_private.h"    // things that we need too
+#include "rmr_si_private.h"            // things that we need too
 #include "rmr_symtab.h"
+#include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
-#include "rtc_si_static.c"                     // specific RMR only route table collector (SI only for now)
+#include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
@@ -183,7 +185,10 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
+                       mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
                }
+
+               mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
 }
@@ -271,7 +276,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
-               fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+               rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
                msg->tp_state = errno;
                return msg;
@@ -285,7 +290,8 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 */
        if( (nn_sock = msg->rts_fd) < 0 ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
@@ -293,7 +299,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
@@ -447,42 +452,42 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
                msg = rcv_msg( ctx, msg );                                      // hard wait for next
                if( msg->state == RMR_OK ) {
                        if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
                                return msg;
                        }
 
                        if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
                                errno = ENOBUFS;
                                return NULL;
                        }
 
-                       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
                        queued++;
                        msg = NULL;
                }
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
        errno = ETIMEDOUT;
        return NULL;
 }
 
 /*
        Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
-       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
+       _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
        but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
        after every 1K send attempts until the "time" value is reached. Retries are abandoned
-       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
+       if NNG returns anything other than EAGAIN or EINTER is returned.
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
@@ -511,7 +516,7 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
        CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
-       fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
+       rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
        return 0;
 }
 
@@ -522,11 +527,15 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
        invokes this.  Internal functions (the route table collector) which need additional
        open ports without starting additional route table collectors, will invoke this
        directly with the proper flag.
+
+       CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
+                               that we know about. The _user_ should ensure that the supplied length also
+                               includes the trace data length maximum as they are in control of that.
 */
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
-       char    bind_info[NNG_MAXADDRLEN];      // bind info
+       char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
@@ -537,12 +546,17 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
        int             state;
        int             i;
+       int             old_vlevel;
+
+       old_vlevel = rmr_vlog_init();                   // initialise and get the current level
+       rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
+       rmr_set_vlevel( old_vlevel );           // return logging to the desired state
 
        errno = 0;
        if( uproto_port == NULL ) {
@@ -557,7 +571,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
        ctx->nrivers = 256;                                                             // number of input flows we'll manage
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
@@ -567,12 +581,21 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
+       ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
+       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
+
+       if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
+               uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
+               uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+       } else {
+               rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
+       }
        init_mtcall( ctx );                                                             // set up call chutes
+       fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
 
-       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -584,7 +607,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
-               fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
+               rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
                free_ctx( ctx );
                return NULL;
        }
@@ -621,7 +644,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-                       fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
                        return NULL;
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
@@ -631,7 +654,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
@@ -647,11 +670,11 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
                        strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
@@ -666,26 +689,28 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        
        snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
                free_ctx( ctx );
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
+       if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
+               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+       } else {
                if( static_rtc ) {
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
-                               fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
-                               fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
                }
        }
 
        ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
-               fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
        }
 
        free( proto_port );
@@ -766,7 +791,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
 
 /*
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+               rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
                return -1;
        }
 */
@@ -886,7 +911,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
                errno = 0;                                              // interrupted call state could be left; clear
-               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        mbuf->state = RMR_OK;
 
@@ -1030,6 +1055,34 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        return mbuf;
 }
 
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
 /*
        Enable low latency things in the transport (when supported).
 */