Add ability to control route table req frequency
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
index f098f53..c196a9a 100644 (file)
@@ -1,8 +1,8 @@
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -220,7 +220,7 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
                snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
                rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
                smsg->len = strlen( smsg->payload ) + 1;
-       
+
                smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
                if( (state = smsg->state) != RMR_OK ) {
                        rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
@@ -236,15 +236,21 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
 
 /*
        Send an ack to the route table manager for a table ID that we are
-       processing.      State is 1 for OK, and 0 for failed. Reason might 
+       processing.      State is 1 for OK, and 0 for failed. Reason might
        be populated if we know why there was a failure.
 
        Context should be the PRIVATE context that we use for messages
        to route manger and NOT the user's context.
+
+       If a message buffere is passed we use that and use return to sender
+       assuming that this might be a response to a call and that is needed
+       to send back to the proper calling thread. If msg is nil, we allocate
+       and use it.
 */
-static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason ) {
-       rmr_mbuf_t*     smsg;
-       
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+       int             use_rts = 1;
+       int             payload_size = 1024;
+
        if( ctx == NULL || ctx->rtg_whid < 0 ) {
                return;
        }
@@ -253,24 +259,36 @@ static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason ) {
                return;
        }
 
-       smsg = rmr_alloc_msg( ctx, 1024 );
+       if( smsg != NULL ) {
+               smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
+       } else {
+               use_rts = 0;
+               smsg = rmr_alloc_msg( ctx, payload_size );
+       }
+
        if( smsg != NULL ) {
                smsg->mtype = RMRRM_TABLE_STATE;
-               smsg->sub_id = 0;
-               snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
-                       ctx->table_id == NULL ? "<id-missing>" : ctx->table_id, reason == NULL ? "" : reason );
+               smsg->sub_id = -1;
+               snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+                       table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
 
                smsg->len = strlen( smsg->payload ) + 1;
-       
-               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
-               smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
+               if( use_rts ) {
+                       smsg = rmr_rts_msg( ctx, smsg );
+               } else {
+                       smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+               }
                if( (state = smsg->state) != RMR_OK ) {
                        rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
                        rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
                        ctx->rtg_whid = -1;
                }
 
-               rmr_free_msg( smsg );
+               if( ! use_rts ) {
+                       rmr_free_msg( smsg );                   // if not our message we must free the leftovers
+               }
        }
 }
 
@@ -280,7 +298,7 @@ static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason ) {
        must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
 */
 static char* clip( char* buf ) {
-       char*   tok;
+       char*   tok;
 
        while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
                buf++;
@@ -402,8 +420,8 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
        char*   tok;
        int             ntoks;
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
-       char*   tokens[128];
-       char*   gtokens[64];
+       char*   tokens[128];
+       char*   gtokens[64];
        int             i;
        int             ngtoks;                         // number of tokens in the group list
        int             grp;                            // index into group list
@@ -411,7 +429,7 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
        rr_field = clip( rr_field );
 
-       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
                (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
                has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
 
@@ -456,7 +474,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
        char*   tok;
        int             ntoks;
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
-       char*   tokens[128];
+       char*   tokens[128];
 
        if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
                return;
@@ -464,7 +482,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
 
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
 
-       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
                (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
                has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
 
@@ -500,7 +518,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
        char*   tok;
        int             ntoks;
-       char*   tokens[128];
+       char*   tokens[128];
        int             i;
        int             state;
        endpoint_t*     ep;                                             // endpoint struct for the owner
@@ -531,7 +549,7 @@ static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, in
 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
        char*   tok;
        int             ntoks;
-       char*   tokens[128];
+       char*   tokens[128];
        int             i;
 
        if( rtab->hash == NULL ) {
@@ -550,8 +568,13 @@ static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
 /*
        Parse a partially parsed meid record. Tokens[0] should be one of:
                meid_map, mme_ar, mme_del.
+
+       pctx is the private context needed to return an ack/nack using the provided
+       message buffer with the route managers address info.
 */
-static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+       char wbuf[1024];
+
        if( tokens == NULL || ntoks < 1 ) {
                return;                                                 // silent but should never happen
        }
@@ -567,8 +590,17 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
                                uta_rt_drop( ctx->new_rtable );
+                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as end never made it
                        }
 
+                       if( ctx->table_id != NULL ) {
+                               free( ctx->table_id );
+                       }
+                       if( ntoks >2 ) {
+                               ctx->table_id = strdup( clip( tokens[2] ) );
+                       } else {
+                               ctx->table_id = NULL;
+                       }
                        ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
                        ctx->new_rtable->mupdates = 0;
                        if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
@@ -577,6 +609,8 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                                if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
                                        if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
                                                rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+                                               snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                uta_rt_drop( ctx->new_rtable );
                                                ctx->new_rtable = NULL;
                                                return;
@@ -591,6 +625,7 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                                        ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
                                        ctx->new_rtable = NULL;
                                        if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+                                       send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
 
                                        if( vlevel > 0 ) {
                                                rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
@@ -606,9 +641,9 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                }
 
                return;
-       }       
+       }
 
-       if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
+       if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
                return;
        }
@@ -672,8 +707,14 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
        messages back to the route manager.  The regular ctx is the ctx that
        the user has been given and thus that's where we have to hang the route
        table we're working with.
+
+       If mbuf is given, and we need to ack, then we ack using the mbuf and a
+       return to sender call (allows route manager to use wh_call() to send
+       an update and rts is required to get that back to the right thread).
+       If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+       will be used.
 */
-static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
        int i;
        int ntoks;                                                      // number of tokens found in something
        int ngtoks;
@@ -693,6 +734,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
        for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
        *(tok+1) = 0;
 
+       memset( tokens, 0, sizeof( tokens ) );
        if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
                tokens[0] = clip( tokens[0] );
                switch( *(tokens[0]) ) {
@@ -723,7 +765,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                        rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
                                                                ctx->new_rtable->updates, tokens[2] );
                                                        snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
-                                                       send_rt_ack( pctx, RMR_OK, wbuf );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                        uta_rt_drop( ctx->new_rtable );
                                                        ctx->new_rtable = NULL;
                                                        break;
@@ -744,14 +786,15 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                        rt_stats( ctx->rtable );
                                                }
 
-                                               send_rt_ack( pctx, RMR_OK, NULL );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+                                               ctx->rtable_ready = 1;                                                  // route based sends can now happen
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
                                        }
                                } else {                                                                                                                        // start a new table.
                                        if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
-                                               send_rt_ack( pctx, !RMR_OK, "table not complete" );                     // nack the one that was pending as end never made it
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
 
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
@@ -761,7 +804,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                free( ctx->table_id );
                                        }
                                        if( ntoks >2 ) {
-                                               ctx->table_id = strdup( tokens[2] );
+                                               ctx->table_id = strdup( clip( tokens[2] ) );
                                        } else {
                                                ctx->table_id = NULL;
                                        }
@@ -773,7 +816,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                }
                                break;
 
-                       case 'm':                                                               // mse entry or one of the meid_ records
+                       case 'm':                                                                       // mse entry or one of the meid_ records
                                if( strcmp( tokens[0], "mse" ) == 0 ) {
                                        if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                                break;
@@ -787,7 +830,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
                                        ctx->new_rtable->updates++;
                                } else {
-                                       meid_parser( ctx, tokens, ntoks, vlevel );
+                                       meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
                                }
                                break;
 
@@ -848,7 +891,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                if( ctx->table_id != NULL ) {
                                                        free( ctx->table_id );
                                                }
-                                               ctx->table_id = strdup( tokens[2] );
+                                               ctx->table_id = strdup( clip( tokens[2] ) );
                                        }
 
                                        ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
@@ -897,7 +940,8 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
                }
        }
 
-       for( rec = fbuf; rec && *rec; rec = eor+1 ) {
+       rec = fbuf;
+       while( rec && *rec ) {
                rcount++;
                if( (eor = strchr( rec, '\n' )) != NULL ) {
                        *eor = 0;
@@ -908,7 +952,9 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
                        return;
                }
 
-               parse_rt_rec( ctx, NULL, rec, vlevel );                 // no pvt context as we can't ack
+               parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
+
+               rec = eor+1;
        }
 
        if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
@@ -1003,7 +1049,7 @@ static char* uta_fib( char* fname ) {
                                fsize = stats.st_size;                                          // stat ok, save the file size
                        }
                } else {
-                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
+                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
                }
        }
 
@@ -1047,6 +1093,7 @@ static route_table_t* uta_rt_init( ) {
        if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
                return NULL;
        }
+       memset( rt, 0, sizeof( *rt ) );
 
        if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
                free( rt );
@@ -1083,7 +1130,9 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
        things.nalloc = 2048;
        things.nused = 0;
        things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
+       memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
        things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
+       memset( things.names, 0, sizeof( char * ) * things.nalloc );
        if( things.things == NULL ) {
                if( free_on_err ) {
                        free( nrt->hash );
@@ -1137,9 +1186,9 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) {
 }
 
 /*
-       Creates a new route table and then clones  _all_ of the given route table (references 
-       both endpoints AND the route table entries. Needed to support a partial update where 
-       some route table entries will not be deleted if not explicitly in the update and when 
+       Creates a new route table and then clones  _all_ of the given route table (references
+       both endpoints AND the route table entries. Needed to support a partial update where
+       some route table entries will not be deleted if not explicitly in the update and when
        we are adding/replacing meid references.
 */
 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
@@ -1198,7 +1247,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                return NULL;
        }
 
-       if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
+       if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
                if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
                        errno = ENOMEM;
@@ -1235,4 +1284,18 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
        return key;
 }
 
+/*
+       Given a route table and meid string, find the owner (if known). Returns a pointer to
+       the endpoint struct or nil.
+*/
+static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
+       endpoint_t* ep;         // the ep we found in the hash
+
+       if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+               return NULL;
+       }
+
+       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
+
 #endif