Enable multi-thread receiver support
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index b1bfd1f..c8c3b4a 100644 (file)
@@ -65,8 +65,7 @@
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
-#include "rtc_static.c"                                // route table collector
-#include "rtc_si_static.c"                     // our private test function
+#include "rtc_si_static.c"                     // specific RMR only route table collector (SI only for now)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
@@ -527,7 +526,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
-       char    bind_info[NNG_MAXADDRLEN];      // bind info
+       char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
@@ -535,11 +534,12 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        char*   tok2;
+       int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
        int             state;
        int             i;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -570,9 +570,16 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
+       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
+
+       if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
+               uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
+               uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+       } else {
+               fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" );
+       }
        init_mtcall( ctx );                                                             // set up call chutes
 
-       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -600,6 +607,12 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
+       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
+               if( atoi( tok ) < 1 ) {
+                       static_rtc = 1;
+               }
+       }
+
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
                tok = strdup( tok );                                    // something we can destroy
                if( *tok == '[' ) {                                             // we allow an ipv6 address here
@@ -665,13 +678,18 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need an rtc
-               if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread
-                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
+               if( static_rtc ) {
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
+                               fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+                       }
+               } else {
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
+                               fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+                       }
                }
        }
 
-       //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx );
        ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
                fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
@@ -1019,6 +1037,34 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        return mbuf;
 }
 
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
 /*
        Enable low latency things in the transport (when supported).
 */