feature(routes): Add rtable update
[ric-plt/lib/rmr.git] / src / common / src / rt_generic_static.c
index 5bbacbd..64cf47f 100644 (file)
@@ -57,9 +57,75 @@ typedef struct thing_list {
        void** things;
 } thing_list_t;
 
-// ------------------------------------------------------------------------------------------------
+// ---- debugging/testing -------------------------------------------------------------------------
+
+/*
+       Dump stats for an endpoint in the RT.
+*/
+static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
+       int*    counter;
+       endpoint_t* ep;
+
+       if( (ep = (endpoint_t *) thing) == NULL ) {
+               return;
+       }
 
+       if( (counter = (int *) vcounter) != NULL ) {
+               (*counter)++;
+       }
+
+       fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
+}
 
+/*
+       Dump stats for a route entry in the table.
+*/
+static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
+       int*    counter;
+       rtable_ent_t* rte;                      // thing is really an rte
+       int             mtype;
+       int             sid;
+
+       if( (rte = (rtable_ent_t *) thing) == NULL ) {
+               return;
+       }
+
+       if( (counter = (int *) vcounter) != NULL ) {
+               (*counter)++;
+       }
+
+       mtype = rte->key & 0xffff;
+       sid = (int) (rte->key >> 32);
+
+       fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
+}
+
+/*
+       Given a route table, cause some stats to be spit out.
+*/
+static void  rt_stats( route_table_t* rt ) {
+       int* counter;
+
+       if( rt == NULL ) {
+               fprintf( stderr, "[DBUG] rtstats: nil table\n" );
+               return;
+       }
+
+       counter = (int *) malloc( sizeof( int ) );
+       *counter = 0;
+       fprintf( stderr, "[DBUG] rtstats:\n" );
+       rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
+       fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
+
+       *counter = 0;
+       rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
+       fprintf( stderr, "[DBUG] %d entries\n", *counter );
+
+       free( counter );
+}
+
+
+// ------------------------------------------------------------------------------------------------
 /*
        Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
        must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
@@ -95,6 +161,7 @@ static char* clip( char* buf ) {
 */
 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
        rtable_ent_t* rte;
+       rtable_ent_t* old_rte;          // entry which was already in the table for the key
 
        if( rt == NULL ) {
                return NULL;
@@ -105,6 +172,8 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
                return NULL;
        }
        memset( rte, 0, sizeof( *rte ) );
+       rte->refs = 1;
+       rte->key = key;
 
        if( nrrgroups <= 0 ) {
                nrrgroups = 10;
@@ -118,9 +187,13 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
+       if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
+               del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
+       }
+
        rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
+       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
        return rte;
 }
 
@@ -159,7 +232,7 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
 
                        key = build_rt_key( subid, atoi( ts_field ) );
 
-                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
+                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
 
                        if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
                                rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
@@ -179,6 +252,48 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
                }
 }
 
+/*
+       Trash_entry takes a partially parsed record from the input and
+       will delete the entry if the sender,mtype matches us or it's a
+       generic mtype. The refernce in the new table is removed and the
+       refcounter for the actual rte is decreased. If that ref count is
+       0 then the memory is freed (handled byh the del_rte call).
+*/
+static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
+       rtable_ent_t*   rte;            // route table entry to be 'deleted'
+       char*   tok;
+       int             ntoks;
+       uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
+       char*   tokens[128];
+
+       if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
+               return;
+       }
+
+       ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
+
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+               (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
+               has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
+
+               key = build_rt_key( subid, atoi( ts_field ) );
+               rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
+               if( rte != NULL ) {
+                       if( DEBUG || (vlevel > 1) ) {
+                                fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
+                       }
+                       rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
+                       del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
+               } else {
+                       if( DEBUG || (vlevel > 1) ) {
+                               fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+                       }
+               }
+       } else {
+               if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
+       }
+}
+
 /*
        Parse a single record recevied from the route table generator, or read
        from a static route table file.  Start records cause a new table to
@@ -219,6 +334,20 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                        case '#':                                                                                               // and comment lines
                                break;
 
+                       case 'd':                                                                                               // del | [sender,]mtype | sub-id
+                               if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
+                                       break;
+                               }
+
+                               if( ntoks < 3 ) {
+                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
+                                       break;
+                               }
+
+                               trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
+                               ctx->new_rtable->updates++;
+                               break;
+
                        case 'n':                                                                                               // newrt|{start|end}
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
@@ -228,6 +357,13 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                                ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
                                                ctx->new_rtable = NULL;
                                                if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
+
+                                               if( vlevel > 0 ) {
+                                                       fprintf( stderr, "[DBUG] old route table:\n" );
+                                                       rt_stats( ctx->old_rtable );
+                                                       fprintf( stderr, "[DBUG] new route table:\n" );
+                                                       rt_stats( ctx->rtable );
+                                               }
                                        } else {
                                                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
@@ -258,6 +394,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
 
                                build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+                               ctx->new_rtable->updates++;
                                break;
 
                        case 'r':                                       // assume rt entry
@@ -265,6 +402,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                        break;
                                }
 
+                               ctx->new_rtable->updates++;
                                if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
                                        build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
                                } else {
@@ -272,6 +410,53 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
                                break;
 
+                       case 'u':                                                                                               // update current table, not a total replacement
+                               tokens[1] = clip( tokens[1] );
+                               if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
+                                       if( ntoks >2 ) {
+                                               if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
+                                                       fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->updates, tokens[2] );
+                                                       uta_rt_drop( ctx->new_rtable );
+                                                       ctx->new_rtable = NULL;
+                                                       break;
+                                               }
+                                       }
+
+                                       if( ctx->new_rtable ) {
+                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
+                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
+                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
+                                               ctx->new_rtable = NULL;
+                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
+
+                                               if( vlevel > 0 ) {
+                                                       fprintf( stderr, "[DBUG] old route table:\n" );
+                                                       rt_stats( ctx->old_rtable );
+                                                       fprintf( stderr, "[DBUG] updated route table:\n" );
+                                                       rt_stats( ctx->rtable );
+                                               }
+                                       } else {
+                                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
+                                               ctx->new_rtable = NULL;
+                                       }
+                               } else {                                                                                        // start a new table.
+                                       if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
+                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
+                                               uta_rt_drop( ctx->new_rtable );
+                                       }
+
+                                       if( ctx->rtable )  {
+                                               ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
+                                       } else {
+                                               ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
+                                       }
+
+                                       ctx->new_rtable->updates = 0;                                           // init count of updates received
+                                       if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
+                               }
+                               break;
+
                        default:
                                if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
                                break;
@@ -341,6 +526,18 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
        Called to delete a route table entry struct. We delete the array of endpoint
        pointers, but NOT the endpoints referenced as those are referenced from
        multiple entries.
+
+       Route table entries can be concurrently referenced by multiple symtabs, so
+       the actual delete happens only if decrementing the rte's ref count takes it
+       to 0. Thus, it is safe to call this function across a symtab when cleaning up
+       the symtab, or overlaying an entry.
+
+       This function uses ONLY the pointer to the rte (thing) and ignores the other
+       information that symtab foreach function passes (st, entry, and data) which
+       means that it _can_ safetly be used outside of the foreach setting. If
+       the function is changed to depend on any of these three, then a stand-alone
+       rte_cleanup() function should be added and referenced by this, and refererences
+       to this outside of the foreach world should be changed.
 */
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
        rtable_ent_t*   rte;
@@ -350,6 +547,11 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
                return;
        }
 
+       rte->refs--;
+       if( rte->refs > 0 ) {                   // something still referencing, so it lives
+               return;
+       }
+
        if( rte->rrgroups ) {                                                                   // clean up the round robin groups
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
@@ -452,7 +654,6 @@ static route_table_t* uta_rt_init( ) {
 static route_table_t* uta_rt_clone( route_table_t* srt ) {
        endpoint_t*             ep;             // an endpoint
        route_table_t*  nrt;    // new route table
-       route_table_t*  art;    // active route table
        void*   sst;                    // source symtab
        void*   nst;                    // new symtab
        thing_list_t things;
@@ -494,6 +695,74 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) {
        return nrt;
 }
 
+/*
+       Clones _all_ of the given route table (references both endpoints AND the route table
+       entries. Needed to support a partial update where some route table entries will not
+       be deleted if not explicitly in the update.
+*/
+static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
+       endpoint_t*             ep;             // an endpoint
+       rtable_ent_t*   rte;    // a route table entry
+       route_table_t*  nrt;    // new route table
+       void*   sst;                    // source symtab
+       void*   nst;                    // new symtab
+       thing_list_t things0;   // things from space 0 (table entries)
+       thing_list_t things1;   // things from space 1 (end points)
+       int i;
+
+       if( srt == NULL ) {
+               return NULL;
+       }
+
+       if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
+               return NULL;
+       }
+
+       if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
+               free( nrt );
+               return NULL;
+       }
+
+       things0.nalloc = 2048;
+       things0.nused = 0;
+       things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
+       if( things0.things == NULL ) {
+               free( nrt->hash );
+               free( nrt );
+               return NULL;
+       }
+
+       things1.nalloc = 2048;
+       things1.nused = 0;
+       things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
+       if( things1.things == NULL ) {
+               free( nrt->hash );
+               free( nrt );
+               return NULL;
+       }
+
+       sst = srt->hash;                                                                                        // convenience pointers (src symtab)
+       nst = nrt->hash;
+
+       rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
+       rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
+
+       for( i = 0; i < things0.nused; i++ ) {
+               rte = (rtable_ent_t *) things0.things[i];
+               rte->refs++;                                                                                            // rtes can be removed, so we track references
+               rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
+       }
+
+       for( i = 0; i < things1.nused; i++ ) {
+               ep = (endpoint_t *) things1.things[i];
+               rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
+       }
+
+       free( things0.things );
+       free( things1.things );
+       return nrt;
+}
+
 /*
        Given a name, find the endpoint struct in the provided route table.
 */