feature(routes): Add rtable update
[ric-plt/lib/rmr.git] / src / common / src / rt_generic_static.c
index 4dd9344..64cf47f 100644 (file)
@@ -44,6 +44,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <netdb.h>
 
 
 /*
@@ -56,14 +57,111 @@ typedef struct thing_list {
        void** things;
 } thing_list_t;
 
+// ---- debugging/testing -------------------------------------------------------------------------
+
+/*
+       Dump stats for an endpoint in the RT.
+*/
+static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
+       int*    counter;
+       endpoint_t* ep;
+
+       if( (ep = (endpoint_t *) thing) == NULL ) {
+               return;
+       }
+
+       if( (counter = (int *) vcounter) != NULL ) {
+               (*counter)++;
+       }
+
+       fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
+}
+
+/*
+       Dump stats for a route entry in the table.
+*/
+static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
+       int*    counter;
+       rtable_ent_t* rte;                      // thing is really an rte
+       int             mtype;
+       int             sid;
+
+       if( (rte = (rtable_ent_t *) thing) == NULL ) {
+               return;
+       }
+
+       if( (counter = (int *) vcounter) != NULL ) {
+               (*counter)++;
+       }
+
+       mtype = rte->key & 0xffff;
+       sid = (int) (rte->key >> 32);
+
+       fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
+}
+
+/*
+       Given a route table, cause some stats to be spit out.
+*/
+static void  rt_stats( route_table_t* rt ) {
+       int* counter;
+
+       if( rt == NULL ) {
+               fprintf( stderr, "[DBUG] rtstats: nil table\n" );
+               return;
+       }
+
+       counter = (int *) malloc( sizeof( int ) );
+       *counter = 0;
+       fprintf( stderr, "[DBUG] rtstats:\n" );
+       rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
+       fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
+
+       *counter = 0;
+       rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
+       fprintf( stderr, "[DBUG] %d entries\n", *counter );
+
+       free( counter );
+}
+
+
+// ------------------------------------------------------------------------------------------------
+/*
+       Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
+       must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
+*/
+static char* clip( char* buf ) {
+       char*   tok;
+
+       while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
+               buf++;
+       }
+
+       if( (tok = strchr( buf, '#' )) != NULL ) {
+               if( tok == buf ) {
+                       return buf;                                     // just push back; leading comment sym handled there
+               }
+
+               if( isspace( *(tok-1) ) ) {
+                       *tok = 0;
+               }
+       }
+
+       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
+       *(tok+1) = 0;
+
+       return buf;
+}
+
 
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
        is the number of group slots to allocate in the entry.
 */
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) {
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
        rtable_ent_t* rte;
+       rtable_ent_t* old_rte;          // entry which was already in the table for the key
 
        if( rt == NULL ) {
                return NULL;
@@ -74,6 +172,8 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
                return NULL;
        }
        memset( rte, 0, sizeof( *rte ) );
+       rte->refs = 1;
+       rte->key = key;
 
        if( nrrgroups <= 0 ) {
                nrrgroups = 10;
@@ -87,12 +187,113 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
-       rmr_sym_map( rt->hash, mtype, rte );                                                    // add to hash using numeric mtype as key
+       if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
+               del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
+       }
+
+       rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups );
+       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
        return rte;
 }
 
+/*
+       This accepts partially parsed information from a record sent by route manager or read from
+       a file such that:
+               ts_field is the msg-type,sender field
+               subid is the integer subscription id
+               rr_field is the endpoint information for round robening message over
+
+       If all goes well, this will add an RTE to the table under construction.
+
+       The ts_field is checked to see if we should ingest this record. We ingest if one of
+       these is true:
+               there is no sender info (a generic entry for all)
+               there is sender and our host:port matches one of the senders
+               the sender info is an IP address that matches one of our IP addresses
+*/
+static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
+       rtable_ent_t*   rte;            // route table entry added
+       char*   tok;
+       int             ntoks;
+       uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
+       char*   tokens[128];
+       char*   gtokens[64];
+       int             i;
+       int             ngtoks;                         // number of tokens in the group list
+       int             grp;                            // index into group list
+
+       ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
+       rr_field = clip( rr_field );
+
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+               (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
+               has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
+
+                       key = build_rt_key( subid, atoi( ts_field ) );
+
+                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+
+                       if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
+                               rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
+
+                               for( grp = 0; grp < ngtoks; grp++ ) {
+                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
+                                               for( i = 0; i < ntoks; i++ ) {
+                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                               }
+                                       }
+                               }
+                       }
+               } else {
+                       if( DEBUG || (vlevel > 2) )
+                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+               }
+}
+
+/*
+       Trash_entry takes a partially parsed record from the input and
+       will delete the entry if the sender,mtype matches us or it's a
+       generic mtype. The refernce in the new table is removed and the
+       refcounter for the actual rte is decreased. If that ref count is
+       0 then the memory is freed (handled byh the del_rte call).
+*/
+static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
+       rtable_ent_t*   rte;            // route table entry to be 'deleted'
+       char*   tok;
+       int             ntoks;
+       uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
+       char*   tokens[128];
+
+       if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
+               return;
+       }
+
+       ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
+
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+               (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
+               has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
+
+               key = build_rt_key( subid, atoi( ts_field ) );
+               rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
+               if( rte != NULL ) {
+                       if( DEBUG || (vlevel > 1) ) {
+                                fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
+                       }
+                       rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
+                       del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
+               } else {
+                       if( DEBUG || (vlevel > 1) ) {
+                               fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+                       }
+               }
+       } else {
+               if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
+       }
+}
+
 /*
        Parse a single record recevied from the route table generator, or read
        from a static route table file.  Start records cause a new table to
@@ -100,6 +301,11 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        entry records are added to the currenly 'in progress' table, and an
        end record causes the in progress table to be finalised and the
        currently active table is replaced.
+
+       We expect one of several types:
+               newrt|{start|end}
+               rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
+               mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
 */
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        int i;
@@ -128,7 +334,22 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                        case '#':                                                                                               // and comment lines
                                break;
 
+                       case 'd':                                                                                               // del | [sender,]mtype | sub-id
+                               if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
+                                       break;
+                               }
+
+                               if( ntoks < 3 ) {
+                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
+                                       break;
+                               }
+
+                               trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
+                               ctx->new_rtable->updates++;
+                               break;
+
                        case 'n':                                                                                               // newrt|{start|end}
+                               tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
@@ -136,6 +357,13 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                                ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
                                                ctx->new_rtable = NULL;
                                                if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
+
+                                               if( vlevel > 0 ) {
+                                                       fprintf( stderr, "[DBUG] old route table:\n" );
+                                                       rt_stats( ctx->old_rtable );
+                                                       fprintf( stderr, "[DBUG] new route table:\n" );
+                                                       rt_stats( ctx->rtable );
+                                               }
                                        } else {
                                                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
@@ -155,33 +383,78 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
                                break;
 
+                       case 'm':                                       // assume mse entry
+                               if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
+                                       break;
+                               }
+
+                               if( ntoks < 4 ) {
+                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+                                       break;
+                               }
+
+                               build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+                               ctx->new_rtable->updates++;
+                               break;
+
                        case 'r':                                       // assume rt entry
                                if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                        break;
                                }
 
-                               if( ((tok = strchr( tokens[1], ',' )) == NULL ) ||                                      // no sender names
-                                       (uta_has_str( tokens[1],  ctx->my_name, ',', 127) >= 0) ||              // our name isn't in the list
-                                       has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) {                               // the list has one of our IP addresses
-
-                                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] );
-
-                                       if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) {                                      // split last field by groups first
-                                               rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks );                        // get/create entry for message type
-                                               for( grp = 0; grp < ngtoks; grp++ ) {
-                                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
-                                                               for( i = 0; i < ntoks; i++ ) {
-                                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", tokens[i] );
-                                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
-                                                               }
-                                                       }
-                                               }
-                                       }
+                               ctx->new_rtable->updates++;
+                               if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
+                                       build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
                                } else {
-                                       if( DEBUG || (vlevel > 2) )
-                                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+                                       build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
                                }
+                               break;
+
+                       case 'u':                                                                                               // update current table, not a total replacement
+                               tokens[1] = clip( tokens[1] );
+                               if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
+                                       if( ntoks >2 ) {
+                                               if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
+                                                       fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->updates, tokens[2] );
+                                                       uta_rt_drop( ctx->new_rtable );
+                                                       ctx->new_rtable = NULL;
+                                                       break;
+                                               }
+                                       }
+
+                                       if( ctx->new_rtable ) {
+                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
+                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
+                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
+                                               ctx->new_rtable = NULL;
+                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
+
+                                               if( vlevel > 0 ) {
+                                                       fprintf( stderr, "[DBUG] old route table:\n" );
+                                                       rt_stats( ctx->old_rtable );
+                                                       fprintf( stderr, "[DBUG] updated route table:\n" );
+                                                       rt_stats( ctx->rtable );
+                                               }
+                                       } else {
+                                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
+                                               ctx->new_rtable = NULL;
+                                       }
+                               } else {                                                                                        // start a new table.
+                                       if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
+                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
+                                               uta_rt_drop( ctx->new_rtable );
+                                       }
+
+                                       if( ctx->rtable )  {
+                                               ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
+                                       } else {
+                                               ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
+                                       }
 
+                                       ctx->new_rtable->updates = 0;                                           // init count of updates received
+                                       if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
+                               }
                                break;
 
                        default:
@@ -253,6 +526,18 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
        Called to delete a route table entry struct. We delete the array of endpoint
        pointers, but NOT the endpoints referenced as those are referenced from
        multiple entries.
+
+       Route table entries can be concurrently referenced by multiple symtabs, so
+       the actual delete happens only if decrementing the rte's ref count takes it
+       to 0. Thus, it is safe to call this function across a symtab when cleaning up
+       the symtab, or overlaying an entry.
+
+       This function uses ONLY the pointer to the rte (thing) and ignores the other
+       information that symtab foreach function passes (st, entry, and data) which
+       means that it _can_ safetly be used outside of the foreach setting. If
+       the function is changed to depend on any of these three, then a stand-alone
+       rte_cleanup() function should be added and referenced by this, and refererences
+       to this outside of the foreach world should be changed.
 */
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
        rtable_ent_t*   rte;
@@ -262,6 +547,11 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
                return;
        }
 
+       rte->refs--;
+       if( rte->refs > 0 ) {                   // something still referencing, so it lives
+               return;
+       }
+
        if( rte->rrgroups ) {                                                                   // clean up the round robin groups
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
@@ -364,7 +654,6 @@ static route_table_t* uta_rt_init( ) {
 static route_table_t* uta_rt_clone( route_table_t* srt ) {
        endpoint_t*             ep;             // an endpoint
        route_table_t*  nrt;    // new route table
-       route_table_t*  art;    // active route table
        void*   sst;                    // source symtab
        void*   nst;                    // new symtab
        thing_list_t things;
@@ -406,6 +695,74 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) {
        return nrt;
 }
 
+/*
+       Clones _all_ of the given route table (references both endpoints AND the route table
+       entries. Needed to support a partial update where some route table entries will not
+       be deleted if not explicitly in the update.
+*/
+static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
+       endpoint_t*             ep;             // an endpoint
+       rtable_ent_t*   rte;    // a route table entry
+       route_table_t*  nrt;    // new route table
+       void*   sst;                    // source symtab
+       void*   nst;                    // new symtab
+       thing_list_t things0;   // things from space 0 (table entries)
+       thing_list_t things1;   // things from space 1 (end points)
+       int i;
+
+       if( srt == NULL ) {
+               return NULL;
+       }
+
+       if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
+               return NULL;
+       }
+
+       if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
+               free( nrt );
+               return NULL;
+       }
+
+       things0.nalloc = 2048;
+       things0.nused = 0;
+       things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
+       if( things0.things == NULL ) {
+               free( nrt->hash );
+               free( nrt );
+               return NULL;
+       }
+
+       things1.nalloc = 2048;
+       things1.nused = 0;
+       things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
+       if( things1.things == NULL ) {
+               free( nrt->hash );
+               free( nrt );
+               return NULL;
+       }
+
+       sst = srt->hash;                                                                                        // convenience pointers (src symtab)
+       nst = nrt->hash;
+
+       rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
+       rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
+
+       for( i = 0; i < things0.nused; i++ ) {
+               rte = (rtable_ent_t *) things0.things[i];
+               rte->refs++;                                                                                            // rtes can be removed, so we track references
+               rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
+       }
+
+       for( i = 0; i < things1.nused; i++ ) {
+               ep = (endpoint_t *) things1.things[i];
+               rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
+       }
+
+       free( things0.things );
+       free( things1.things );
+       return nrt;
+}
+
 /*
        Given a name, find the endpoint struct in the provided route table.
 */
@@ -463,4 +820,21 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
 }
 
 
+/*
+       Given a session id and message type build a key that can be used to look up the rte in the route
+       table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
+*/
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
+       uint64_t key;
+
+       if( sub_id == UNSET_SUBID ) {
+               key = 0xffffffff00000000 | mtype;
+       } else {
+               key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
+       }
+
+       return key;
+}
+
+
 #endif