3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ------------------------------------------------------------------------------------------------
64 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
65 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
67 static char* clip( char* buf ) {
70 while( *buf && isspace( *buf ) ) { // skip leading whitespace
74 if( (tok = strchr( buf, '#' )) != NULL ) {
76 return buf; // just push back; leading comment sym handled there
79 if( isspace( *(tok-1) ) ) {
84 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
92 Given a message type create a route table entry and add to the hash keyed on the
93 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
94 is the number of group slots to allocate in the entry.
96 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
98 rtable_ent_t* old_rte; // entry which was already in the table for the key
104 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
105 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
108 memset( rte, 0, sizeof( *rte ) );
111 if( nrrgroups <= 0 ) {
115 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
116 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
120 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
121 rte->nrrgroups = nrrgroups;
123 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
124 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
127 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
129 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
134 This accepts partially parsed information from a record sent by route manager or read from
136 ts_field is the msg-type,sender field
137 subid is the integer subscription id
138 rr_field is the endpoint information for round robening message over
140 If all goes well, this will add an RTE to the table under construction.
142 The ts_field is checked to see if we should ingest this record. We ingest if one of
144 there is no sender info (a generic entry for all)
145 there is sender and our host:port matches one of the senders
146 the sender info is an IP address that matches one of our IP addresses
148 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
149 rtable_ent_t* rte; // route table entry added
152 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
156 int ngtoks; // number of tokens in the group list
157 int grp; // index into group list
159 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
160 rr_field = clip( rr_field );
162 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
163 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
164 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
166 key = build_rt_key( subid, atoi( ts_field ) );
168 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
170 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
171 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
173 for( grp = 0; grp < ngtoks; grp++ ) {
174 if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
175 for( i = 0; i < ntoks; i++ ) {
176 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field );
177 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
183 if( DEBUG || (vlevel > 2) )
184 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
189 Parse a single record recevied from the route table generator, or read
190 from a static route table file. Start records cause a new table to
191 be started (if a partial table was received it is discarded. Table
192 entry records are added to the currenly 'in progress' table, and an
193 end record causes the in progress table to be finalised and the
194 currently active table is replaced.
196 We expect one of several types:
198 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
199 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
201 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
203 int ntoks; // number of tokens found in something
205 int grp; // group number
206 rtable_ent_t* rte; // route table entry added
208 char* gtokens[64]; // groups
209 char* tok; // pointer into a token or string
215 while( *buf && isspace( *buf ) ) { // skip leading whitespace
218 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
221 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
222 switch( *(tokens[0]) ) {
223 case 0: // ignore blanks
225 case '#': // and comment lines
228 case 'n': // newrt|{start|end}
229 tokens[1] = clip( tokens[1] );
230 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
231 if( ctx->new_rtable ) {
232 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
233 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
234 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
235 ctx->new_rtable = NULL;
236 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
238 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
239 ctx->new_rtable = NULL;
241 } else { // start a new table.
242 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
243 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
244 uta_rt_drop( ctx->new_rtable );
248 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
250 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
252 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
256 case 'm': // assume mse entry
257 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
262 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
266 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
269 case 'r': // assume rt entry
270 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
274 if( ntoks > 3 ) { // assume new entry with subid last
275 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
277 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
282 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
289 This function attempts to open a static route table in order to create a 'seed'
290 table during initialisation. The environment variable RMR_SEED_RT is expected
291 to contain the necessary path to the file. If missing, or if the file is empty,
292 no route table will be available until one is received from the generator.
294 This function is probably most useful for testing situations, or extreme
295 cases where the routes are static.
297 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
300 char* fbuf; // buffer with file contents
301 char* rec; // start of the record
302 char* eor; // end of the record
303 int rcount = 0; // record count for debug
305 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
309 if( (fbuf = uta_fib( fname ) ) == NULL ) { // read file into a single buffer
310 fprintf( stderr, "[WRN] seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
314 if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully opened: %s\n", fname );
315 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
317 if( (eor = strchr( rec, '\n' )) != NULL ) {
321 parse_rt_rec( ctx, rec, vlevel );
324 if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully parsed: %d records\n", rcount );
329 Callback driven for each named thing in a symtab. We collect the pointers to those
330 things for later use (cloning).
332 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
335 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
339 if( thing == NULL ) {
343 tl->things[tl->nused++] = thing; // save a reference to the thing
347 Called to delete a route table entry struct. We delete the array of endpoint
348 pointers, but NOT the endpoints referenced as those are referenced from
351 Route table entries can be concurrently referenced by multiple symtabs, so
352 the actual delete happens only if decrementing the rte's ref count takes it
353 to 0. Thus, it is safe to call this function across a symtab when cleaning up
354 the symtab, or overlaying an entry.
356 This function uses ONLY the pointer to the rte (thing) and ignores the other
357 information that symtab foreach function passes (st, entry, and data) which
358 means that it _can_ safetly be used outside of the foreach setting. If
359 the function is changed to depend on any of these three, then a stand-alone
360 rte_cleanup() function should be added and referenced by this, and refererences
361 to this outside of the foreach world should be changed.
363 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
367 if( (rte = (rtable_ent_t *) thing) == NULL ) {
372 if( rte->refs > 0 ) { // something still referencing, so it lives
376 if( rte->rrgroups ) { // clean up the round robin groups
377 for( i = 0; i < rte->nrrgroups; i++ ) {
378 if( rte->rrgroups[i] ) {
379 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
383 free( rte->rrgroups );
386 free( rte ); // finally, drop the potato
390 Read an entire file into a buffer. We assume for route table files
391 they will be smallish and so this won't be a problem.
392 Returns a pointer to the buffer, or nil. Caller must free.
393 Terminates the buffer with a nil character for string processing.
395 If we cannot stat the file, we assume it's empty or missing and return
396 an empty buffer, as opposed to a nil, so the caller can generate defaults
397 or error if an empty/missing file isn't tolerated.
399 static char* uta_fib( char* fname ) {
401 off_t fsize = 8192; // size of the file
402 off_t nread; // number of bytes read
404 char* buf; // input buffer
406 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
407 if( fstat( fd, &stats ) >= 0 ) {
408 if( stats.st_size <= 0 ) { // empty file
412 fsize = stats.st_size; // stat ok, save the file size
415 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
419 if( fd < 0 ) { // didn't open or empty
420 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
428 // add a size limit check here
430 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
436 nread = read( fd, buf, fsize );
437 if( nread < 0 || nread > fsize ) { // failure of some kind
439 errno = EFBIG; // likely too much to handle
451 Create and initialise a route table; Returns a pointer to the table struct.
453 static route_table_t* uta_rt_init( ) {
456 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
460 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
469 Clone (sort of) an existing route table. This is done to preserve the endpoint
470 names referenced in a table (and thus existing sessions) when a new set
471 of message type to endpoint name mappings is received. A new route table
472 with only endpoint name references is returned based on the active table in
475 static route_table_t* uta_rt_clone( route_table_t* srt ) {
476 endpoint_t* ep; // an endpoint
477 route_table_t* nrt; // new route table
478 //route_table_t* art; // active route table
479 void* sst; // source symtab
480 void* nst; // new symtab
488 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
492 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
497 things.nalloc = 2048;
499 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
500 if( things.things == NULL ) {
506 sst = srt->hash; // convenience pointers (src symtab)
509 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
511 for( i = 0; i < things.nused; i++ ) {
512 ep = (endpoint_t *) things.things[i];
513 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
516 free( things.things );
521 Clones _all_ of the given route table (references both endpoints AND the route table
522 entries. Needed to support a partial update where some route table entries will not
523 be deleted if not explicitly in the update.
525 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
526 endpoint_t* ep; // an endpoint
527 rtable_ent_t* rte; // a route table entry
528 route_table_t* nrt; // new route table
529 //route_table_t* art; // active route table
530 void* sst; // source symtab
531 void* nst; // new symtab
532 thing_list_t things0; // things from space 0 (table entries)
533 thing_list_t things1; // things from space 1 (end points)
540 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
544 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
549 things0.nalloc = 2048;
551 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
552 if( things0.things == NULL ) {
558 things1.nalloc = 2048;
560 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
561 if( things1.things == NULL ) {
567 sst = srt->hash; // convenience pointers (src symtab)
570 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
571 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
573 for( i = 0; i < things0.nused; i++ ) {
574 rte = (rtable_ent_t *) things0.things[i];
575 rte->refs++; // rtes can be removed, so we track references
576 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
579 for( i = 0; i < things1.nused; i++ ) {
580 ep = (endpoint_t *) things1.things[i];
581 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
584 free( things0.things );
585 free( things1.things );
590 Given a name, find the endpoint struct in the provided route table.
592 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
594 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
598 return rmr_sym_get( rt->hash, ep_name, 1 );
602 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
604 static void uta_rt_drop( route_table_t* rt ) {
609 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
610 rmr_sym_free( rt->hash ); // free all of the hash related data
615 Look up and return the pointer to the endpoint stuct matching the given name.
616 If not in the hash, a new endpoint is created, added to the hash. Should always
619 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
622 if( !rt || !ep_name || ! *ep_name ) {
623 fprintf( stderr, "[WARN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
628 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
629 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
630 fprintf( stderr, "[WARN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
635 ep->open = 0; // not connected
636 ep->addr = uta_h2ip( ep_name );
637 ep->name = strdup( ep_name );
639 rmr_sym_put( rt->hash, ep_name, 1, ep );
647 Given a session id and message type build a key that can be used to look up the rte in the route
648 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
650 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
653 if( sub_id == UNSET_SUBID ) {
654 key = 0xffffffff00000000 | mtype;
656 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);