fix(rtable): Potential memory leak in rte replace
[ric-plt/lib/rmr.git] / src / common / src / rt_generic_static.c
index 26e907e..ef38b8a 100644 (file)
@@ -1,14 +1,14 @@
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
 /*
        Mnemonic:       rt_generic_static.c
 
 /*
        Mnemonic:       rt_generic_static.c
-       Abstract:       These are route table functions which are not specific to the 
+       Abstract:       These are route table functions which are not specific to the
                                underlying protocol.  rtable_static, and rtable_nng_static
                                have transport provider specific code.
 
                                This file must be included before the nng/nano specific file as
                                underlying protocol.  rtable_static, and rtable_nng_static
                                have transport provider specific code.
 
                                This file must be included before the nng/nano specific file as
-                               it defines types. 
+                               it defines types.
 
        Author:         E. Scott Daniels
        Date:           5  February 2019
 
        Author:         E. Scott Daniels
        Date:           5  February 2019
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <netdb.h>
 
 
 /*
 
 
 /*
-       Passed to a symtab foreach callback to construct a list of pointers from 
+       Passed to a symtab foreach callback to construct a list of pointers from
        a current symtab.
 */
 typedef struct thing_list {
        a current symtab.
 */
 typedef struct thing_list {
@@ -56,14 +57,45 @@ typedef struct thing_list {
        void** things;
 } thing_list_t;
 
        void** things;
 } thing_list_t;
 
+// ------------------------------------------------------------------------------------------------
+
+
+/*
+       Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
+       must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
+*/
+static char* clip( char* buf ) {
+       char*   tok;
+
+       while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
+               buf++;
+       }
+
+       if( (tok = strchr( buf, '#' )) != NULL ) {
+               if( tok == buf ) {
+                       return buf;                                     // just push back; leading comment sym handled there
+               }
+
+               if( isspace( *(tok-1) ) ) {
+                       *tok = 0;
+               }
+       }
+
+       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
+       *(tok+1) = 0;
+
+       return buf;
+}
+
 
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
        is the number of group slots to allocate in the entry.
 */
 
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
        is the number of group slots to allocate in the entry.
 */
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) {
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
        rtable_ent_t* rte;
        rtable_ent_t* rte;
+       rtable_ent_t* old_rte;          // entry which was already in the table for the key
 
        if( rt == NULL ) {
                return NULL;
 
        if( rt == NULL ) {
                return NULL;
@@ -74,6 +106,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
                return NULL;
        }
        memset( rte, 0, sizeof( *rte ) );
                return NULL;
        }
        memset( rte, 0, sizeof( *rte ) );
+       rte->refs = 1;
 
        if( nrrgroups <= 0 ) {
                nrrgroups = 10;
 
        if( nrrgroups <= 0 ) {
                nrrgroups = 10;
@@ -87,19 +120,83 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups )
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
        memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
        rte->nrrgroups = nrrgroups;
 
-       rmr_sym_map( rt->hash, mtype, rte );                                                    // add to hash using numeric mtype as key
+       if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
+               del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
+       }
+
+       rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
 
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups );
+       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
        return rte;
 }
 
        return rte;
 }
 
+/*
+       This accepts partially parsed information from a record sent by route manager or read from
+       a file such that:
+               ts_field is the msg-type,sender field
+               subid is the integer subscription id
+               rr_field is the endpoint information for round robening message over
+
+       If all goes well, this will add an RTE to the table under construction.
+
+       The ts_field is checked to see if we should ingest this record. We ingest if one of
+       these is true:
+               there is no sender info (a generic entry for all)
+               there is sender and our host:port matches one of the senders
+               the sender info is an IP address that matches one of our IP addresses
+*/
+static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
+       rtable_ent_t*   rte;            // route table entry added
+       char*   tok;
+       int             ntoks;
+       uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
+       char*   tokens[128];
+       char*   gtokens[64];
+       int             i;
+       int             ngtoks;                         // number of tokens in the group list
+       int             grp;                            // index into group list
+
+       ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
+       rr_field = clip( rr_field );
+
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+               (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
+               has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
+
+                       key = build_rt_key( subid, atoi( ts_field ) );
+
+                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
+
+                       if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
+                               rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
+
+                               for( grp = 0; grp < ngtoks; grp++ ) {
+                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
+                                               for( i = 0; i < ntoks; i++ ) {
+                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                               }
+                                       }
+                               }
+                       }
+               } else {
+                       if( DEBUG || (vlevel > 2) )
+                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+               }
+}
+
 /*
        Parse a single record recevied from the route table generator, or read
 /*
        Parse a single record recevied from the route table generator, or read
-       from a static route table file.  Start records cause a new table to 
+       from a static route table file.  Start records cause a new table to
        be started (if a partial table was received it is discarded. Table
        be started (if a partial table was received it is discarded. Table
-       entry records are added to the currenly 'in progress' table, and an 
+       entry records are added to the currenly 'in progress' table, and an
        end record causes the in progress table to be finalised and the
        currently active table is replaced.
        end record causes the in progress table to be finalised and the
        currently active table is replaced.
+
+       We expect one of several types:
+               newrt|{start|end}
+               rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
+               mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
 */
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        int i;
 */
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        int i;
@@ -129,6 +226,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                break;
 
                        case 'n':                                                                                               // newrt|{start|end}
                                break;
 
                        case 'n':                                                                                               // newrt|{start|end}
+                               tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable ) {
                                                uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
@@ -144,7 +242,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                                if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
                                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                                if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
-                                       }       
+                                       }
 
                                        if( ctx->rtable )  {
                                                ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
 
                                        if( ctx->rtable )  {
                                                ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
@@ -155,33 +253,29 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
                                break;
 
                                }
                                break;
 
+                       case 'm':                                       // assume mse entry
+                               if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
+                                       break;
+                               }
+
+                               if( ntoks < 4 ) {
+                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+                                       break;
+                               }
+
+                               build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+                               break;
+
                        case 'r':                                       // assume rt entry
                                if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                        break;
                                }
 
                        case 'r':                                       // assume rt entry
                                if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                        break;
                                }
 
-                               if( ((tok = strchr( tokens[1], ',' )) == NULL ) ||                                      // no sender names
-                                       (uta_has_str( tokens[1],  ctx->my_name, ',', 127) >= 0) ||              // our name isn't in the list
-                                       has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) {                               // the list has one of our IP addresses 
-
-                                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] );
-                                       
-                                       if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) {                                      // split last field by groups first
-                                               rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks );                        // get/create entry for message type
-                                               for( grp = 0; grp < ngtoks; grp++ ) {
-                                                       if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
-                                                               for( i = 0; i < ntoks; i++ ) {
-                                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", tokens[i] );
-                                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
-                                                               }
-                                                       }
-                                               }
-                                       }
+                               if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
+                                       build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
                                } else {
                                } else {
-                                       if( DEBUG || (vlevel > 2) ) 
-                                               fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+                                       build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
                                }
                                }
-
                                break;
 
                        default:
                                break;
 
                        default:
@@ -195,9 +289,9 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
        This function attempts to open a static route table in order to create a 'seed'
        table during initialisation.  The environment variable RMR_SEED_RT is expected
        to contain the necessary path to the file. If missing, or if the file is empty,
        This function attempts to open a static route table in order to create a 'seed'
        table during initialisation.  The environment variable RMR_SEED_RT is expected
        to contain the necessary path to the file. If missing, or if the file is empty,
-       no route table will be available until one is received from the generator. 
+       no route table will be available until one is received from the generator.
 
 
-       This function is probably most useful for testing situations, or extreme 
+       This function is probably most useful for testing situations, or extreme
        cases where the routes are static.
 */
 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
        cases where the routes are static.
 */
 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
@@ -250,9 +344,21 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
 }
 
 /*
 }
 
 /*
-       Called to delete a route table entry struct. We delete the array of endpoint 
-       pointers, but NOT the endpoints referenced as those are referenced from 
+       Called to delete a route table entry struct. We delete the array of endpoint
+       pointers, but NOT the endpoints referenced as those are referenced from
        multiple entries.
        multiple entries.
+
+       Route table entries can be concurrently referenced by multiple symtabs, so 
+       the actual delete happens only if decrementing the rte's ref count takes it
+       to 0. Thus, it is safe to call this function across a symtab when cleaning up
+       the symtab, or overlaying an entry.
+
+       This function uses ONLY the pointer to the rte (thing) and ignores the other
+       information that symtab foreach function passes (st, entry, and data) which 
+       means that it _can_ safetly be used outside of the foreach setting. If
+       the function is changed to depend on any of these three, then a stand-alone
+       rte_cleanup() function should be added and referenced by this, and refererences
+       to this outside of the foreach world should be changed.
 */
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
        rtable_ent_t*   rte;
 */
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
        rtable_ent_t*   rte;
@@ -262,6 +368,11 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
                return;
        }
 
                return;
        }
 
+       rte->refs--;
+       if( rte->refs > 0 ) {                   // something still referencing, so it lives
+               return;
+       }
+
        if( rte->rrgroups ) {                                                                   // clean up the round robin groups
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
        if( rte->rrgroups ) {                                                                   // clean up the round robin groups
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
@@ -291,7 +402,7 @@ static char* uta_fib( char* fname ) {
        off_t           nread;                  // number of bytes read
        int                     fd;
        char*           buf;                    // input buffer
        off_t           nread;                  // number of bytes read
        int                     fd;
        char*           buf;                    // input buffer
-       
+
        if( (fd = open( fname, O_RDONLY )) >= 0 ) {
                if( fstat( fd, &stats ) >= 0 ) {
                        if( stats.st_size <= 0 ) {                                      // empty file
        if( (fd = open( fname, O_RDONLY )) >= 0 ) {
                if( fstat( fd, &stats ) >= 0 ) {
                        if( stats.st_size <= 0 ) {                                      // empty file
@@ -362,9 +473,9 @@ static route_table_t* uta_rt_init( ) {
        the context.
 */
 static route_table_t* uta_rt_clone( route_table_t* srt ) {
        the context.
 */
 static route_table_t* uta_rt_clone( route_table_t* srt ) {
-       endpoint_t*             ep;             // an endpoint 
+       endpoint_t*             ep;             // an endpoint
        route_table_t*  nrt;    // new route table
        route_table_t*  nrt;    // new route table
-       route_table_t*  art;    // active route table
+       //route_table_t*        art;    // active route table
        void*   sst;                    // source symtab
        void*   nst;                    // new symtab
        thing_list_t things;
        void*   sst;                    // source symtab
        void*   nst;                    // new symtab
        thing_list_t things;
@@ -396,7 +507,7 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) {
        nst = nrt->hash;
 
        rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
        nst = nrt->hash;
 
        rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
-       
+
        for( i = 0; i < things.nused; i++ ) {
                ep = (endpoint_t *) things.things[i];
                rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
        for( i = 0; i < things.nused; i++ ) {
                ep = (endpoint_t *) things.things[i];
                rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
@@ -406,6 +517,75 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) {
        return nrt;
 }
 
        return nrt;
 }
 
+/*
+       Clones _all_ of the given route table (references both endpoints AND the route table
+       entries. Needed to support a partial update where some route table entries will not 
+       be deleted if not explicitly in the update.
+*/
+static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
+       endpoint_t*             ep;             // an endpoint
+       rtable_ent_t*   rte;    // a route table entry
+       route_table_t*  nrt;    // new route table
+       //route_table_t*        art;    // active route table
+       void*   sst;                    // source symtab
+       void*   nst;                    // new symtab
+       thing_list_t things0;   // things from space 0 (table entries)
+       thing_list_t things1;   // things from space 1 (end points)
+       int i;
+
+       if( srt == NULL ) {
+               return NULL;
+       }
+
+       if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
+               return NULL;
+       }
+
+       if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
+               free( nrt );
+               return NULL;
+       }
+
+       things0.nalloc = 2048;
+       things0.nused = 0;
+       things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
+       if( things0.things == NULL ) {
+               free( nrt->hash );
+               free( nrt );
+               return NULL;
+       }
+
+       things1.nalloc = 2048;
+       things1.nused = 0;
+       things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
+       if( things1.things == NULL ) {
+               free( nrt->hash );
+               free( nrt );
+               return NULL;
+       }
+
+       sst = srt->hash;                                                                                        // convenience pointers (src symtab)
+       nst = nrt->hash;
+
+       rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
+       rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
+
+       for( i = 0; i < things0.nused; i++ ) {
+               rte = (rtable_ent_t *) things0.things[i];
+               rte->refs++;                                                                                            // rtes can be removed, so we track references
+               rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
+       }
+
+       for( i = 0; i < things1.nused; i++ ) {
+               ep = (endpoint_t *) things1.things[i];
+               rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
+       }
+
+       free( things0.things );
+       free( things1.things );
+       return nrt;
+}
+
 /*
        Given a name, find the endpoint struct in the provided route table.
 */
 /*
        Given a name, find the endpoint struct in the provided route table.
 */
@@ -463,4 +643,21 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
 }
 
 
 }
 
 
+/*
+       Given a session id and message type build a key that can be used to look up the rte in the route
+       table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
+*/
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
+       uint64_t key;
+
+       if( sub_id == UNSET_SUBID ) {
+               key = 0xffffffff00000000 | mtype;
+       } else {
+               key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
+       }
+
+       return key;
+}
+
+
 #endif
 #endif