Fix potential race in route table load 94/4994/4 4.4.0
authorE. Scott Daniels <daniels@research.att.com>
Thu, 5 Nov 2020 14:11:04 +0000 (09:11 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 5 Nov 2020 15:29:33 +0000 (10:29 -0500)
If route tables are received in quick succession there is a potential
race condition that can cause the xAPPs thread to use a stale pointer
likely resulting in a segfault.

Issue-ID: RIC-674

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I5536d3f30646dca0ed375468719fa3cf0920c103
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
18 files changed:
CHANGES_CORE.txt
CMakeLists.txt
doc/src/rst.im
docs/rel-notes.rst
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/rtc_static.c
src/rmr/common/src/symtab.c
src/rmr/common/src/wormholes.c
src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtable_si_static.c
src/rmr/si/src/sr_si_static.c
test/rmr_si_rcv_test.c
test/rmr_si_test.c
test/rt_static_test.c
test/symtab_test.c
test/test_ctx_support.c

index e991000..a3e1373 100644 (file)
@@ -5,6 +5,10 @@
 # API and build change  and fix summaries. Doc correctsions
 # and/or changes are not mentioned here; see the commit messages.
 
+2020 November 4; Version 4.4.0
+       Changes to address a potential race condition when route tables
+       arrive in quick succession.  (RIC-674)
+
 2020 October 30; Version 4.3.1
        Changes to address code analyser scans and two bug fixes identified
        while addressing the analysis data. (RIC-673)
index bd58d84..2ac15d7 100644 (file)
@@ -40,8 +40,8 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "4" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "3" )
-set( patch_level "1" )
+set( minor_version "4" )
+set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 2702fd7..5892ad3 100644 (file)
        .dv _ch8 56p
        .dv _ch10 70p
 
+
        .** Long strings of equals and dashes needed to make title/subtitle easier to generate. Multi
        .** line annotations for headers could be used, but the code is messy for what results in
-       .** 4 lines in the setup files.
+       .** 4 lines in the setup files.  CAUTION: tildas must be quoted with back-ticks.
        .**
        .dv many_equals ============================================================================================
        .dv many_dashes --------------------------------------------------------------------------------------------
-       .dv many_tildas ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+       .dv many_tildas `^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~`
+
 
        .gv semver
        .if &_major 1 >
index d244c0c..a37304b 100644 (file)
@@ -22,6 +22,14 @@ the need to leap frog versions ceased, and beginning with
 version 4.0.0, the RMR versions should no longer skip.
 
 
+2020 November 4; Version 4.4.0
+------------------------------
+
+Changes to address a potential race condition when route
+tables arrive in quick succession. (RIC-674)
+
+
+
 2020 October 30; Version 4.3.1
 ------------------------------
 
index 419145b..f7eb324 100644 (file)
@@ -30,6 +30,7 @@
 #define _rmr_agnostic_h
 
 #include <semaphore.h>                                 // needed to support some structs
+#include <pthread.h>
 
 typedef struct endpoint endpoint_t;            // place holder for structs defined in nano/nng private.h
 typedef struct uta_ctx  uta_ctx_t;
@@ -224,9 +225,12 @@ typedef struct {
        The route table.
 */
 typedef struct {
-       void*   hash;                   // hash table.
+       void*   hash;                   // hash table for msg type and meid.
+       void*   ephash;                 // hash for endpoint references
        int             updates;                // counter of update records received
        int             mupdates;               // counter of meid update records received
+       int             ref_count;              // num threads currently using
+       pthread_mutex_t*        gate;   // lock allowing update to ref counter
 } route_table_t;
 
 /*
@@ -309,16 +313,18 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
 static endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid );
 static char* uta_fib( char const* fname );
-static route_table_t* uta_rt_init( );
-static route_table_t* uta_rt_clone( route_table_t* srt );
-static route_table_t* uta_rt_clone_all( route_table_t* srt );
+static route_table_t* uta_rt_init( uta_ctx_t* ctx  );
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all );
 static void uta_rt_drop( route_table_t* rt );
+static inline route_table_t* get_rt( uta_ctx_t* ctx );
 static endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group  );
 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all );
 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf );
 static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
+static void release_rt( uta_ctx_t* ctx, route_table_t* rt );
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
 
index ea7f01a..4f8fa12 100644 (file)
 #include <sys/stat.h>
 #include <unistd.h>
 #include <netdb.h>
+#include <pthread.h>
 
 #include <RIC_message_types.h>         // needed for route manager messages
 
+#define ALL 1
+#define SOME 0
 
 /*
        Passed to a symtab foreach callback to construct a list of pointers from
@@ -179,7 +182,7 @@ static void  rt_stats( route_table_t* rt ) {
        *counter = 0;
        rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
        rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
-       rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
+       rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
        rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
 
        rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
@@ -306,7 +309,7 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s
        }
 }
 
-// ------------------------------------------------------------------------------------------------
+// ---- utility -----------------------------------------------------------------------------------
 /*
        Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
        must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
@@ -380,6 +383,26 @@ static char* ensure_nlterm( char* buf ) {
        return nb;
 }
 
+/*
+       Roll the new table into the active and the active into the old table. We
+       must have the lock on the active table to do this. It's possible that there
+       is no active table (first load), so we have to account for that (no locking).
+*/
+static void roll_tables( uta_ctx_t* ctx ) {
+       if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
+               pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
+               ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
+               ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
+               pthread_mutex_unlock( ctx->rtgate );
+       } else {
+               ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
+               ctx->rtable = ctx->new_rtable;                          // make new the active one
+       }
+
+       ctx->new_rtable = NULL;
+}
+
+// ------------ entry update functions ---------------------------------------------------------------
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
@@ -531,6 +554,8 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
        }
 }
 
+// -------------------------- parse functions --------------------------------------------------
+
 /*
        Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
        the 'owner' which should be the dns name or IP address of an enpoint
@@ -617,25 +642,29 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char
                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
                                uta_rt_drop( ctx->new_rtable );
-                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as end never made it
+                               ctx->new_rtable = NULL;
+                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
                        }
 
                        if( ctx->table_id != NULL ) {
                                free( ctx->table_id );
                        }
-                       if( ntoks >2 ) {
+                       if( ntoks > 2 ) {
                                ctx->table_id = strdup( clip( tokens[2] ) );
                        } else {
                                ctx->table_id = NULL;
                        }
-                       ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
+
+                       ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
                        ctx->new_rtable->mupdates = 0;
+
                        if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
                } else {
                        if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
                                if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
                                        if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
-                                               rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+                                               rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->mupdates, tokens[2] );
                                                snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
                                                send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                uta_rt_drop( ctx->new_rtable );
@@ -647,16 +676,17 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char
                                }
 
                                if( ctx->new_rtable ) {
-                                       uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                       ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                       ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                       ctx->new_rtable = NULL;
+                                       roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
                                        if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
                                        send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
 
                                        if( vlevel > 0 ) {
-                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
-                                               rt_stats( ctx->old_rtable );
+                                               if( ctx->old_rtable != NULL ) {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+                                                       rt_stats( ctx->old_rtable );
+                                               } else {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+                                               }
                                                rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
                                                rt_stats( ctx->rtable );
                                        }
@@ -800,15 +830,16 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        }
 
                                        if( ctx->new_rtable ) {
-                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                               ctx->new_rtable = NULL;
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+                                               roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
 
                                                if( vlevel > 0 ) {
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
-                                                       rt_stats( ctx->old_rtable );
+                                                       if( ctx->old_rtable != NULL ) {
+                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+                                                               rt_stats( ctx->old_rtable );
+                                                       } else {
+                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+                                                       }
                                                        rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
                                                        rt_stats( ctx->rtable );
                                                }
@@ -825,6 +856,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
 
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
+                                               ctx->new_rtable = NULL;
                                        }
 
                                        if( ctx->table_id != NULL ) {
@@ -836,9 +868,9 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                ctx->table_id = NULL;
                                        }
 
-                                       ctx->new_rtable = NULL;
-                                       ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
+                                       ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
                                        ctx->new_rtable->updates = 0;                                           // init count of entries received
+
                                        if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
                                }
                                break;
@@ -892,15 +924,16 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        }
 
                                        if( ctx->new_rtable ) {
-                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                               ctx->new_rtable = NULL;
+                                               roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
 
                                                if( vlevel > 0 ) {
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
-                                                       rt_stats( ctx->old_rtable );
+                                                       if( ctx->old_rtable != NULL ) {
+                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table:  (ref_count=%d)\n", ctx->old_rtable->ref_count );
+                                                               rt_stats( ctx->old_rtable );
+                                                       } else {
+                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+                                                       }
                                                        rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
                                                        rt_stats( ctx->rtable );
                                                }
@@ -912,17 +945,19 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
+                                               ctx->new_rtable = NULL;
                                        }
 
-                                       if( ntoks >2 ) {
+                                       if( ntoks > 2 ) {
                                                if( ctx->table_id != NULL ) {
                                                        free( ctx->table_id );
                                                }
                                                ctx->table_id = strdup( clip( tokens[2] ) );
                                        }
 
-                                       ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
-                                       ctx->new_rtable->updates = 0;                                           // init count of updates received
+                                       ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
+                                       ctx->new_rtable->updates = 0;                                                   // init count of updates received
+
                                        if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
                                }
                                break;
@@ -1113,15 +1148,20 @@ static char* uta_fib( char const* fname ) {
        return buf;
 }
 
+// --------------------- initialisation/creation ---------------------------------------------
 /*
        Create and initialise a route table; Returns a pointer to the table struct.
 */
-static route_table_t* uta_rt_init( ) {
+static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
        route_table_t*  rt;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
        if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
                return NULL;
        }
+
        memset( rt, 0, sizeof( *rt ) );
 
        if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
@@ -1129,6 +1169,10 @@ static route_table_t* uta_rt_init( ) {
                return NULL;
        }
 
+       rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
+       rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
+       pthread_mutex_init( rt->gate, NULL );
+
        return rt;
 }
 
@@ -1138,7 +1182,7 @@ static route_table_t* uta_rt_init( ) {
        Space is the space in the old table to copy. Space 0 uses an integer key and
        references rte structs. All other spaces use a string key and reference endpoints.
 */
-static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
+static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
        endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
        rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
        void*   sst;                    // source symtab
@@ -1147,9 +1191,12 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
        int             i;
        int             free_on_err = 0;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
        if( nrt == NULL ) {                             // make a new table if needed
                free_on_err = 1;
-               nrt = uta_rt_init();
+               nrt = uta_rt_init( ctx );
                if( nrt == NULL ) {
                        return NULL;
                }
@@ -1198,70 +1245,89 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
 }
 
 /*
-       Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
-       The endpoint and meid entries in the hash must be preserved.
-
-       NOTE: The first call to rt_clone_space() will create the new table and subsequent
-               calls operate on the new table. The return of subsequent calls can be safely
-               ignored.  There are some code analysers which will claim that there are memory
-               leaks here; not true as they aren't understanding the logic, just looking at
-               an ignored return value and assuming it's different than what was passed in.
+       Given a destination route table (drt), clone from the source (srt) into it.
+       If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
+       allocate the drt if that was nil too). If all is true (1), then we will clone both
+       the MT and the ME spaces; otherwise only the ME space is cloned.
 */
-static route_table_t* uta_rt_clone( route_table_t* srt ) {
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
        endpoint_t*             ep;                             // an endpoint
        rtable_ent_t*   rte;                    // a route table entry
-       route_table_t*  nrt = NULL;             // new route table
        int i;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
+       if( drt == NULL ) {
+               drt = uta_rt_init( ctx );
+       }
        if( srt == NULL ) {
-               return uta_rt_init();           // no source to clone, just return an empty table
+               return drt;
        }
 
-       nrt = rt_clone_space( srt, NULL, RT_NAME_SPACE );               // allocate a new one, add endpoint refs
-       rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
+       drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
+       rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
+       if( all ) {
+               rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
+       }
 
-       return nrt;
+       return drt;
 }
 
 /*
-       Creates a new route table and then clones  _all_ of the given route table (references
-       both endpoints AND the route table entries. Needed to support a partial update where
-       some route table entries will not be deleted if not explicitly in the update and when
-       we are adding/replacing meid references.
+       Prepares the "new" route table for populating. If the old_rtable is not nil, then
+       we wait for it's use count to reach 0. Then the table is cleared, and moved on the
+       context to be referenced by the new pointer; the old pointer is set to nil.
 
-       NOTE  see note in uta_rt_clone() as it applies here too.
+       If the old table doesn't exist, then a new table is created and the new pointer is
+       set to reference it.
 */
-static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
-       endpoint_t const*       ep;                     // an endpoint
-       rtable_ent_t const*     rte;            // a route table entry
-       route_table_t*  nrt = NULL;             // new route table
-       int i;
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
+       int counter = 0;
+       route_table_t*  rt;
 
-       if( srt == NULL ) {
-               return uta_rt_init();           // no source to clone, just return an empty table
+       if( ctx == NULL ) {
+               return NULL;
        }
 
-       nrt = rt_clone_space( srt, NULL, RT_MT_SPACE );                 // create new, clone all spaces to it
-       rt_clone_space( srt, nrt, RT_NAME_SPACE );
-       rt_clone_space( srt, nrt, RT_ME_SPACE );
+       if( (rt = ctx->old_rtable) != NULL ) {
+               ctx->old_rtable = NULL;
+               while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
+                       if( counter++ > 1000 ) {
+                               rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
+                               break;
+                       }
 
-       return nrt;
+                       usleep( 1000 );                                         // small sleep to yield the processer if that is needed
+               }
+
+               rmr_sym_clear( rt );                                    // clear all entries from the old table
+       } else {
+               rt = NULL;
+       }
+
+       rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
+       rt->ref_count = 0;                                                      // take no chances; ensure it's 0!
+
+       return rt;
 }
 
+
 /*
        Given a name, find the endpoint struct in the provided route table.
 */
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
 
-       if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
+       if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
                return NULL;
        }
 
-       return rmr_sym_get( rt->hash, ep_name, 1 );
+       return rmr_sym_get( rt->ephash, ep_name, 1 );
 }
 
 /*
        Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
+       Does NOT destroy the gate as it's a common gate for ALL route tables.
 */
 static void uta_rt_drop( route_table_t* rt ) {
        if( rt == NULL ) {
@@ -1301,7 +1367,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
                memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
 
-               rmr_sym_put( rt->hash, ep_name, 1, ep );
+               rmr_sym_put( rt->ephash, ep_name, 1, ep );
        }
 
        return ep;
@@ -1338,4 +1404,50 @@ static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid )
        return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
 }
 
+/*
+       This returns a pointer to the currently active route table and ups
+       the reference count so that the route table is not freed while it
+       is being used. The caller MUST call release_rt() when finished
+       with the pointer.
+
+       Care must be taken: the ctx->rtable pointer _could_ change during the time
+       between the release of the lock and the return. Therefore we MUST grab
+       the current pointer when we have the lock so that if it does we don't
+       return a pointer to the wrong table.
+
+       This will return NULL if there is no active table.
+*/
+static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
+       route_table_t*  rrt;                    // return value
+
+       if( ctx == NULL || ctx->rtable == NULL ) {
+               return NULL;
+       }
+
+       pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
+       rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
+       rrt->ref_count++;
+       pthread_mutex_unlock( ctx->rtgate );
+
+       return rrt;                                                                             // pointer we upped the count with
+}
+
+/*
+       This will "release" the route table by reducing the use counter
+       in the table. The table may not be freed until the counter reaches
+       0, so it's imparative that the pointer be "released" when it is
+       fetched by get_rt().  Once the caller has released the table it
+       may not safely use the pointer that it had.
+*/
+static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
+       if( ctx == NULL || rt == NULL ) {
+               return;
+       }
+
+       pthread_mutex_lock( ctx->rtgate );                              // must hold lock
+       if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
+               rt->ref_count--;
+       }
+       pthread_mutex_unlock( ctx->rtgate );
+}
 #endif
index c1bad15..2deef8e 100644 (file)
@@ -270,6 +270,11 @@ static void* rtc( void* vctx ) {
                return NULL;
        }
 
+       if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) {                // master hash table for endpoints (each rt will reference this)
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
+               return NULL;
+       }
+
        if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
                vfd = open( eptr, O_RDONLY );
                vlevel = refresh_vlevel( vfd );
index 18cf60a..c88c82c 100644 (file)
@@ -53,6 +53,7 @@ Mod:          2016 23 Feb - converted Symtab refs so that caller need only a
 #include <stdlib.h>
 #include <memory.h>
 #include <netdb.h>
+#include <pthread.h>
 
 #include "rmr_symtab.h"
 
@@ -96,7 +97,7 @@ static int sym_hash( const char *n, long size )
        return (int ) (t % size);
 }
 
-/* 
+/*
        Delete element pointed to by eptr which is assumed to be
        a member of the list at symtab[i].
 */
@@ -148,7 +149,7 @@ static void del_head_ele( Sym_tab *table, int hv ) {
                        sym_tab[hv]->prev = NULL;                                       // new head
                }
                eptr->next = NULL;                                                              // take no chances
-                       
+
                if( eptr->class && eptr->name ) {                               // class 0 entries are numeric, so name is NOT a pointer
                        free( (void *) eptr->name );
                }
@@ -179,8 +180,8 @@ static inline int same( unsigned int c1, unsigned int c2, const char *s1, const
        much the same.
 */
 static int putin( Sym_tab *table, const char *name, unsigned int class, void *val ) {
-       Sym_ele *eptr;                  /* pointer into hash table */
-       Sym_ele **sym_tab;      /* pointer into hash table */
+       Sym_ele *eptr;                  /* pointer into hash table */
+       Sym_ele **sym_tab;              /* pointer into hash table */
        int hv;                 /* hash value */
        int rc = 0;             /* assume it existed */
        uint64_t nkey = 0;              // numeric key if class == 0
@@ -196,7 +197,7 @@ static int putin( Sym_tab *table, const char *name, unsigned int class, void *va
                for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
        }
 
-       if( ! eptr ) {                  // not found above, so add
+       if( ! eptr ) {                  // not found above, so add
                rc++;
                table->inhabitants++;
 
@@ -502,13 +503,13 @@ extern void rmr_sym_foreach_class( void *vst, unsigned int class, void (* user_f
        Sym_ele **list;
        Sym_ele *se;
        Sym_ele *next;          /* allows user to delete the node(s) we return */
-       int     i;
+       int             i;
 
        st = (Sym_tab *) vst;
 
        if( st && (list = st->symlist) != NULL && user_fun != NULL ) {
                for( i = 0; i < st->size; i++ ) {
-                       se = list[i]; 
+                       se = list[i];
                        while( se ) {
                                next = se->next;                        // allow callback to delete from the list w/o borking us
                                if( class == se->class ) {
index 248748f..a177944 100644 (file)
@@ -187,6 +187,7 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
        rmr_whid_t      whid = -1;              // wormhole id is the index into the list
        wh_mgt_t*       whm;                    // easy reference to wh mgt stuff
        int                     i;
+       route_table_t*  rt;                     // the currently active route table
 
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || target == NULL || *target == 0 ) {
@@ -207,8 +208,10 @@ extern rmr_whid_t rmr_wh_open( void* vctx, char const* target ) {
 
        whm = ctx->wormholes;
 
-
-       if( (ep = rt_ensure_ep( ctx->rtable, target )) == NULL ) {              // get pointer to ep if there, create new if not
+       rt = get_rt( ctx );                                                                             // get and raise ref counter
+       ep = rt_ensure_ep( rt, target );                                                // get pointer to ep if there, create new if not
+       release_rt( ctx, rt );                                                                  // release use counter
+       if( ep == NULL ) {
                rmr_vlog( RMR_VL_ERR, "wormhole_open: ensure ep returned bad: target=(%s)\n", target );
                return -1;                      // ensure sets errno
        }
index fb4e79f..f125efb 100644 (file)
@@ -156,7 +156,10 @@ struct uta_ctx {
        int                     max_ibm;                // max size of an inbound message (river accum alloc size)
        void*           zcb_mring;              // zero copy buffer mbuf ring
        void*           fd2ep;                          // the symtab mapping file des to endpoints for cleanup on disconnect
+       void*           ephash;                         // hash  host:port or ip:port to endpoint struct
+
        pthread_mutex_t *fd2ep_gate;    // we must gate add/deletes to the fd2 symtab
+       pthread_mutex_t *rtgate;                // master gate for accessing/moving route tables
 };
 
 typedef uta_ctx_t uta_ctx;
index 2b96424..9b2444a 100644 (file)
@@ -702,7 +702,21 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
-       ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // create an empty route table so that wormhole/rts calls can be used
+
+       // ---------------- setup for route table collector before invoking ----------------------------------
+       ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
+       if( ctx->rtgate != NULL ) {
+               pthread_mutex_init( ctx->rtgate, NULL );
+       }
+
+       ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
+       if( ctx->ephash == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
+               free_ctx( ctx );
+               return NULL;
+       }
+
+       ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
        if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
                ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
        } else {
index f929295..e0e3dc6 100644 (file)
@@ -170,7 +170,6 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                return NULL;
        }
 
-       //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
        if( (rrg = rte->rrgroups[group]) == NULL ) {
                if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
@@ -186,7 +185,6 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
 
                rte->rrgroups[group] = rrg;
-               //fprintf( stderr, ">>>> added new rrg grp=%d to rte @ 0x%p  rrg=%p\n", group, rte, rte->rrgroups[group] );
 
                rrg->ep_idx = 0;                                                // next endpoint to send to
                rrg->nused = 0;                                                 // number populated
@@ -195,7 +193,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group );
        }
 
-       ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
+       ep = rt_ensure_ep( rt, ep_name );                       // get the ep and create one if not known
 
        if( rrg != NULL ) {
                if( rrg->nused >= rrg->nendpts ) {
@@ -214,27 +212,32 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
 
 
 /*
-       Given a name, find the nano socket needed to send to it. Returns the socket via
+       Given a name, find the socket fd needed to send to it. Returns the socket via
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
        route_table_t*  rt = NULL;
-       si_ctx_t*               si_ctx;
+       si_ctx_t*               si_ctx = NULL;
        endpoint_t*             ep;
        int                             state = FALSE;
 
        if( PARANOID_CHECKS ) {
-               if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
+               if( ctx == NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p\n", ctx, rt );
+                       return FALSE;
+               }
+               rt = get_rt( ctx );                             // get active rt and bump ref count
+               if( rt == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop rt=%p sictx=%p\n", rt, si_ctx );
                        return FALSE;
                }
        } else {
-               rt = ctx->rtable;                               // faster but more risky
+               rt = get_rt( ctx );                             // get active rt and bump ref count
                si_ctx = ctx->si_ctx;
        }
 
-       ep =  rmr_sym_get( rt->hash, ep_name, 1 );
+       ep =  rmr_sym_get( rt->ephash, ep_name, 1 );
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name );
        if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
                *uepp = ep;
@@ -242,9 +245,11 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
        if( ep == NULL ) {
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
                if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
+                       release_rt( ctx, rt );                                                  // drop ref count
                        return FALSE;
                }
        }
+       release_rt( ctx, rt );                                                                          // drop ref count
 
        if( ! ep->open )  {                                                                             // not open -- connect now
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
@@ -268,7 +273,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
 
 /*
        Make a round robin selection within a round robin group for a route table
-       entry. Returns the nanomsg socket if there is a rte for the message
+       entry. Returns the socket fd if there is a rte for the message
        key, and group is defined. Socket is returned via pointer in the parm
        list (nn_sock).
 
@@ -282,8 +287,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
        The return value is true (>0) if the socket was found and *nn_sock was updated
        and false (0) if there is no associated socket for the msg type, group combination.
        We return the index+1 from the round robin table on success so that we can verify
-       during test that different entries are being seleted; we cannot depend on the nng
-       socket being different as we could with nano.
+       during test that different entries are being seleted.
 
        NOTE:   The round robin selection index increment might collide with other
                threads if multiple threads are attempting to send to the same round
@@ -308,8 +312,6 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
                si_ctx = ctx->si_ctx;
        }
 
-       //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups );
-
        if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
index 119b88e..b56b6dc 100644 (file)
@@ -724,6 +724,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int                     sock_ok;                        // got a valid socket from round robin select
        char*           d1;
        int                     ok_sends = 0;           // track number of ok sends
+       route_table_t*  rt;                             // active route table
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -748,7 +749,9 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                max_to = ctx->send_retries;             // convert to retries
        }
 
-       if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
+       rt = get_rt( ctx );                                                                             // get active route table and up ref count
+       if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) {                // find the entry which matches subid/type allow fallback to type only key
+               release_rt( ctx, rt );
                rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                msg->state = RMR_ERR_NOENDPT;
                errno = ENXIO;                                                                          // must ensure it's not eagain
@@ -762,7 +765,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
                        sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
                } else {
-                       sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+                       sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
                        send_again = 0;
                }
 
@@ -775,6 +778,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        if( send_again ) {
                                clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                                if( clone_m == NULL ) {
+                                       release_rt( ctx, rt );
                                        msg->state = RMR_ERR_SENDFAILED;
                                        errno = ENOMEM;
                                        msg->tp_state = errno;
@@ -808,7 +812,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                        case RMR_OK:
                                                ep->scounts[EPSC_GOOD]++;
                                                break;
-       
+
                                        case RMR_ERR_RETRY:
                                                ep->scounts[EPSC_TRANS]++;
                                                break;
@@ -826,6 +830,8 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                }
        }
 
+       release_rt( ctx, rt );                          // we can safely dec the ref counter now
+
        if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
                msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
                if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
index a77e741..2b95df7 100644 (file)
@@ -57,7 +57,6 @@
                                                                                        // specific test tools in this directory
 #undef NNG_UNDER_TEST 
 #include "test_support.c"                                      // things like fail_if()
-#include "test_ctx_support.c"                          // dummy context support
 #include "test_msg_support.c"
 #include "test_gen_rt.c"
 
@@ -72,6 +71,8 @@
 #include "rmr_si.c"
 #include "mbuf_api.c"
 
+#include "test_ctx_support.c"                          // dummy context support (needs rmr headers)
+
 
 static void gen_rt( uta_ctx_t* ctx );          // defined in sr_si_static_test, but used by a few others (eliminate order requirement below)
 
index 16d570f..3ae8d9d 100644 (file)
@@ -60,7 +60,6 @@
                                                                                        // specific test tools in this directory
 #undef NNG_UNDER_TEST 
 #include "test_support.c"                                      // things like fail_if()
-#include "test_ctx_support.c"                          // dummy context support
 #include "test_msg_support.c"
 #include "test_gen_rt.c"
 
@@ -75,6 +74,8 @@
 #include "rmr_si.c"
 #include "mbuf_api.c"
 
+#include "test_ctx_support.c"                          // dummy context support
+
 
 static void gen_rt( uta_ctx_t* ctx );          // defined in sr_si_static_test, but used by a few others (eliminate order requirement below)
 
index 15c6dee..8fd91ac 100644 (file)
@@ -193,7 +193,11 @@ static int rt_test( ) {
        entries[enu].group = 0; entries[enu].ep_name = "localhost:5512"; enu++;
 
 
-       rt = uta_rt_init( );                                                                            // get us a route table
+       rt = uta_rt_init( NULL );
+       errors += fail_if_false( rt == NULL, "rt_init given a nil context didn't return nil" );
+
+       ctx = mk_dummy_ctx();           // make a dummy with rtgate mutex
+       rt = uta_rt_init( ctx );                                                                                // get us a route table
        if( (errors += fail_if_nil( rt, "pointer to route table" )) ) {
                fprintf( stderr, "<FAIL> abort: cannot continue without a route table\n" );
                exit( 1 );
@@ -225,7 +229,7 @@ static int rt_test( ) {
        // ----- end hacking together a route table ---------------------------------------------------
 
 
-       crt = uta_rt_clone( rt );                                                               // clone only the endpoint entries
+       crt = uta_rt_clone( ctx, rt, NULL, 0 );                                                         // create a new rt and clone only the me entries
        errors += fail_if_nil( crt, "cloned route table" );
        if( crt ) {
                c1 = count_entries( rt, 1 );
@@ -234,12 +238,14 @@ static int rt_test( ) {
 
                c2 = count_entries( crt, 0 );
                errors += fail_not_equal( c2, 0, "cloned (endpoints) table entries space 0 count (a) was not zero as expected" );
+
+               errors += fail_if_false( crt->ephash == rt->ephash, "ephash pointer in cloned table is not right" );
                uta_rt_drop( crt );
        }
 
 
-       crt = uta_rt_clone_all( rt );                                                   // clone all entries
-       errors += fail_if_nil( crt, "cloned all route table" );
+       crt = uta_rt_clone( ctx, rt, NULL, 1 );                                                 // clone all entries (MT and ME)
+       errors += fail_if_nil( crt, "cloned (all) route table" );
 
        if( crt ) {
                c1 = count_entries( rt, 0 );
@@ -249,6 +255,8 @@ static int rt_test( ) {
                c1 = count_entries( rt, 1 );
                c2 = count_entries( crt, 1 );
                errors += fail_not_equal( c1, c2, "cloned (all) table entries space 1 count (b) did not match original table count (a)" );
+
+               errors += fail_if_false( crt->ephash == rt->ephash, "ephash pointer in cloned table (all) is not right" );
                uta_rt_drop( crt );
        }
 
@@ -275,7 +283,7 @@ static int rt_test( ) {
        #ifdef NNG_UNDER_TEST
                state = uta_epsock_byname( rt, "localhost:4561", &nn_sock, &ep );               // this should be found
        #else
-               state = uta_epsock_byname( ctx, "localhost:4561", &nn_sock, &ep );      // this should be found
+               state = uta_epsock_byname( ctx, "localhost:4561", &nn_sock, &ep );              // this should be found
        #endif
        errors += fail_if_equal( state, 0, "socket (by name)" );
        errors += fail_if_nil( ep, "epsock_byname did not populate endpoint pointer when expected to" );
@@ -289,13 +297,15 @@ static int rt_test( ) {
        #endif
        errors += fail_not_equal( state, 0, "socket (by name) nil check returned true" );
 
-       ep->open = 1;
-       #if  NNG_UNDER_TEST
-               state = uta_epsock_byname( rt, "localhost:4561", &nn_sock, NULL );              // test coverage on nil checks
-       #else
-               state = uta_epsock_byname( ctx, "localhost:4561", &nn_sock, NULL );
-       #endif
-       errors += fail_if_equal( state, 0, "socket (by name) open ep check returned false" );
+       if( ep ) {                                      // if previous test fails, cant run this
+               ep->open = 1;
+               #if  NNG_UNDER_TEST
+                       state = uta_epsock_byname( rt, "localhost:4561", &nn_sock, NULL );              // test coverage on nil checks
+               #else
+                       state = uta_epsock_byname( ctx, "localhost:4561", &nn_sock, NULL );
+               #endif
+               errors += fail_if_equal( state, 0, "socket (by name) open ep check returned false" );
+       }
 
 
        // --- test that the get_rte function finds expected keys, and retries to find 'bad' sid attempts for valid mtypes with no sid
@@ -372,7 +382,7 @@ static int rt_test( ) {
        errors += fail_if_true( state, "uta_epsock_rr returned bad (non-zero) state when given nil socket pointer" );
 
 
-       uta_rt_clone( NULL );                                                           // verify null parms don't crash things
+       uta_rt_clone( ctx, NULL, NULL, 0 );                                                             // verify null parms don't crash things
        uta_rt_drop( NULL );
        #ifdef NNG_UNDER_TEST
                uta_epsock_rr( NULL, 0,  &more, &nn_sock, &ep );                        // drive null case for coverage
@@ -405,6 +415,7 @@ static int rt_test( ) {
                free( buf );
        }
 
+fprintf( stderr, ">>>>>> test is overtly dropping rt table at %p\n", rt );
        uta_rt_drop( rt );
        rt = NULL;
 
index 1742e01..8cb8b15 100644 (file)
                                the symbol table portion of RMr.  Run with:
                                        ksh unit_test.ksh symtab_test.c
        Date:           1 April 2019
-       Author:         E. Scott Daniels
+       Author:         E. Scott Daniels
 */
 
+#include <pthread.h>
+
 #define NO_DUMMY_RMR 1                 // no dummy rmr functions; we don't pull in rmr.h or agnostic.h
 #define NO_EMULATION
 #define NO_PRIVATE_HEADERS
@@ -39,6 +41,7 @@
 #include "symtab.c"                                                    // module under test
 
 
+int terrors = 0;                                                       // thread errors
 int state = GOOD;                                                      // overall pass/fail state 0==fail
 int counter;                                                           // global counter for for-each tests
 
@@ -89,6 +92,99 @@ static int nfetch( void* st, int key, int expected ) {
        return error;
 }
 
+// ----------------- thread based tests -------------------------------------------------------------------
+#define NUM_KEYS       512                                     // number of unique keys
+#define NUM_ATTEMPTS   1000000
+
+/*
+       This is started in a thread and will attempt 10,000 reads on the symtable
+       in an attempt to ensure that there are no concurrent read/write issues.
+*/
+static void* reader( void* st ) {
+       char    key[1024];
+       int             i;
+       int             ncount = 0;                     // number not found
+       int             fcount = 0;                     // number found
+
+       for( i = 0; i < NUM_ATTEMPTS; i++ ) {
+               snprintf( key, sizeof( key ), "key_%d", i % NUM_KEYS );
+               if( rmr_sym_get( st, key, 1 ) == NULL ) {
+                       ncount++;
+               } else {
+                       fcount++;
+               }
+       }
+
+       fprintf( stderr, "<info> reader finished: n=%d f=%d\n", ncount, fcount );       // there is no right answer
+       return NULL;
+}
+
+/*
+       This is started in a thread and will attempt 10,000 writes on the symtable
+       in an attempt to ensure that there are no concurrent read/write issues. Keys are
+       written as key_n where n is an integer between 0 and 999 inclusive.
+*/
+static void* writer( void* st ) {
+       char    key[1024];
+       int             i;
+       int             ncount = 0;                     // number first inserts
+       int             rcount = 0;                     // number replacements
+       char*   value = NULL;
+       int             num_keys = 256;
+
+       fprintf( stderr, "<INFO> writer now turning\n" );
+       for( i = 0; i < NUM_ATTEMPTS; i++ ) {
+               value++;
+               snprintf( key, sizeof( key ), "key_%d", i % NUM_KEYS );
+               rmr_sym_del( st, key, 1 );
+               if( rmr_sym_put( st, key, 1, value )  ) {
+                       ncount++;
+               } else {
+                       rcount++;
+               }
+       }
+
+       if( ncount != NUM_ATTEMPTS ) {
+               fprintf( stderr, "<FAIL> writer finished: n=%d r=%d\n", ncount, rcount );       // there is no right answer
+               terrors++;
+       } else {
+               fprintf( stderr, "<INFO> writer finished: n=%d r=%d\n", ncount, rcount );       // there is no right answer
+       }
+
+       return NULL;
+}
+
+/*
+       Drive a concurrent read/write test to ensure no race issues.
+*/
+static int thread_test( ) {
+       pthread_t       tids[10];
+       int                     n2start = 3;
+       int                     i;
+       void*           st;
+
+       st = rmr_sym_alloc( 128 );                      // should force collisions
+
+       fprintf( stderr, "<INFO> starting writer\n" );
+       pthread_create( &tids[0], NULL, writer, st );
+
+       for( i = 1; i <= n2start; i++ ) {
+               fprintf( stderr, "<INFO> starting reader %d\n", i );
+               pthread_create( &tids[i], NULL, reader, st );
+       }
+
+       fprintf( stderr, "<INFO> thread controller is waiting\n" );
+       for( i = 0; i <= n2start; i++ ) {
+               pthread_join( tids[i], NULL );                          // status is unimportant, just hold until all are done
+               fprintf( stderr, "<INFO> thread %d  has reported complete\n", i );
+       }
+
+
+       rmr_sym_stats( st, 1 );
+       return terrors;
+}
+
+// ---------------------------------------------------------------------------------------------------------
 
 /*
        Driven by foreach class -- just incr the counter.
@@ -158,6 +254,8 @@ int main( ) {
                fprintf( stderr, "<FAIL> %d errors in symtab code\n\n", errors );
        }
 
+       errors += thread_test();
+
        return !!(state + errors);
 }
 
index 2a66c04..d15239a 100644 (file)
@@ -61,15 +61,22 @@ static inline uta_ctx_t *mk_dummy_ctx() {
        if( ctx == NULL ) {
                return NULL;
        }
-
        memset( ctx, 0, sizeof( *ctx ) );
 
+       ctx->ephash = rmr_sym_alloc( 129 );
+
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
        ctx->zcb_mring = uta_mk_ring( 128 );            // zero copy buffer mbuf ring to reduce malloc/free calls
        ctx->si_ctx = malloc( 1024 );
        ctx->my_name = strdup( "hostname1" );
        ctx->my_ip = strdup( "123.45.67.89" );
 
+       ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );
+    if( ctx->rtgate != NULL ) {
+        pthread_mutex_init( ctx->rtgate, NULL );
+    }
+
+
        return ctx;
 }