3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
61 // ---- debugging/testing -------------------------------------------------------------------------
64 Dump some stats for an endpoint in the RT. This is generally called to
65 verify endpoints after a table load/change.
67 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
71 if( (ep = (endpoint_t *) thing) == NULL ) {
75 if( (counter = (int *) vcounter) != NULL ) {
79 fprintf( stderr, "[DBUG] RMR rt endpoint: target=%s open=%d\n", ep->name, ep->open );
83 Called to count meid entries in the table. The meid points to an 'owning' endpoint
84 so we can list what we find
86 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
90 if( (ep = (endpoint_t *) thing) == NULL ) {
94 if( (counter = (int *) vcounter) != NULL ) {
98 fprintf( stderr, "[DBUG] RMR meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
102 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
103 the 'source' information and is added to each message.
105 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
109 if( (ep = (endpoint_t *) thing) == NULL ) {
113 if( (id = (char *) vid) == NULL ) {
117 fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
118 (long long) time( NULL ),
122 ep->scounts[EPSC_GOOD],
123 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
124 ep->scounts[EPSC_FAIL],
125 ep->scounts[EPSC_TRANS] );
129 Dump stats for a route entry in the table.
131 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
133 rtable_ent_t* rte; // thing is really an rte
137 if( (rte = (rtable_ent_t *) thing) == NULL ) {
141 if( (counter = (int *) vcounter) != NULL ) {
145 mtype = rte->key & 0xffff;
146 sid = (int) (rte->key >> 32);
148 fprintf( stderr, "[DBUG] RMR rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
152 Given a route table, cause some stats to be spit out.
154 static void rt_stats( route_table_t* rt ) {
158 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
162 counter = (int *) malloc( sizeof( int ) );
164 fprintf( stderr, "[DBUG] RMR route table stats:\n" );
165 fprintf( stderr, "[DBUG] RMR route table endpoints:\n" );
166 rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
167 fprintf( stderr, "[DBUG] RMR rtable: %d known endpoints\n", *counter );
169 fprintf( stderr, "[DBUG] RMR route table entries:\n" );
171 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
172 fprintf( stderr, "[DBUG] RMR rtable: %d mt entries in table\n", *counter );
174 fprintf( stderr, "[DBUG] RMR route table meid map:\n" );
176 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
177 fprintf( stderr, "[DBUG] RMR rtable: %d meids in map\n", *counter );
183 Given a route table, cause endpoint counters to be written to stderr. The id
184 parm is written as the "source" in the output.
186 static void rt_epcounts( route_table_t* rt, char* id ) {
188 fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
192 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
196 // ------------------------------------------------------------------------------------------------
198 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
199 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
201 static char* clip( char* buf ) {
204 while( *buf && isspace( *buf ) ) { // skip leading whitespace
208 if( (tok = strchr( buf, '#' )) != NULL ) {
210 return buf; // just push back; leading comment sym handled there
213 if( isspace( *(tok-1) ) ) {
218 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
225 This accepts a pointer to a nil terminated string, and ensures that there is a
226 newline as the last character. If there is not, a new buffer is allocated and
227 the newline is added. If a new buffer is allocated, the buffer passed in is
228 freed. The function returns a pointer which the caller should use, and must
229 free. In the event of an error, a nil pointer is returned.
231 static char* ensure_nlterm( char* buf ) {
236 if( buf == NULL || (len = strlen( buf )) < 2 ) {
237 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
242 if( buf[len-1] != '\n' ) {
243 fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
244 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
245 memcpy( nb, buf, len );
246 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
258 Given a message type create a route table entry and add to the hash keyed on the
259 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
260 is the number of group slots to allocate in the entry.
262 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
264 rtable_ent_t* old_rte; // entry which was already in the table for the key
270 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
271 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
274 memset( rte, 0, sizeof( *rte ) );
278 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
283 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
284 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
288 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
290 rte->rrgroups = NULL;
293 rte->nrrgroups = nrrgroups;
295 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
296 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
299 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
301 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
306 This accepts partially parsed information from an rte or mse record sent by route manager or read from
308 ts_field is the msg-type,sender field
309 subid is the integer subscription id
310 rr_field is the endpoint information for round robening message over
312 If all goes well, this will add an RTE to the table under construction.
314 The ts_field is checked to see if we should ingest this record. We ingest if one of
316 there is no sender info (a generic entry for all)
317 there is sender and our host:port matches one of the senders
318 the sender info is an IP address that matches one of our IP addresses
320 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
321 rtable_ent_t* rte; // route table entry added
324 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
328 int ngtoks; // number of tokens in the group list
329 int grp; // index into group list
331 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
332 rr_field = clip( rr_field );
334 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
335 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
336 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
338 key = build_rt_key( subid, atoi( ts_field ) );
340 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
342 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
343 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
344 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
346 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
347 rte->mtype = atoi( ts_field ); // capture mtype for debugging
349 for( grp = 0; grp < ngtoks; grp++ ) {
350 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
351 for( i = 0; i < ntoks; i++ ) {
352 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
353 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] );
354 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
361 if( DEBUG || (vlevel > 2) ) {
362 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
368 Trash_entry takes a partially parsed record from the input and
369 will delete the entry if the sender,mtype matches us or it's a
370 generic mtype. The refernce in the new table is removed and the
371 refcounter for the actual rte is decreased. If that ref count is
372 0 then the memory is freed (handled byh the del_rte call).
374 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
375 rtable_ent_t* rte; // route table entry to be 'deleted'
378 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
381 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
385 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
387 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
388 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
389 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
391 key = build_rt_key( subid, atoi( ts_field ) );
392 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
394 if( DEBUG || (vlevel > 1) ) {
395 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
397 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
398 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
400 if( DEBUG || (vlevel > 1) ) {
401 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
405 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
410 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
411 the 'owner' which should be the dns name or IP address of an enpoint
412 the meid_list is a space separated list of me IDs
414 This function assumes the caller has vetted the pointers as needed.
416 For each meid in the list, an entry is pushed into the hash which references the owner
417 endpoint such that when the meid is used to route a message it references the endpoint
420 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
426 endpoint_t* ep; // endpoint struct for the owner
428 owner = clip( owner ); // ditch extra whitespace and trailing comments
429 meid_list = clip( meid_list );
431 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
432 for( i = 0; i < ntoks; i++ ) {
433 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
434 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
435 if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
436 fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
438 fprintf( stderr, "[WRN] rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
444 Given the tokens from an mme_del, delete the listed meid entries from the new
445 table. The list is a space separated list of meids.
447 The meids in the hash reference endpoints which are never deleted and so
448 the only thing that we need to do here is to remove the meid from the hash.
450 This function assumes the caller has vetted the pointers as needed.
452 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
458 if( rtab->hash == NULL ) {
462 meid_list = clip( meid_list );
464 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
465 for( i = 0; i < ntoks; i++ ) {
466 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
467 if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_del: meid deleted: %s\n", tokens[i] );
472 Parse a partially parsed meid record. Tokens[0] should be one of:
473 meid_map, mme_ar, mme_del.
475 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
476 if( tokens == NULL || ntoks < 1 ) {
477 return; // silent but should never happen
480 if( ntoks < 2 ) { // must have at least two for any valid request record
481 fprintf( stderr, "[ERR] meid_parse: not enough tokens on %s record\n", tokens[0] );
485 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
486 tokens[1] = clip( tokens[1] );
487 if( *(tokens[1]) == 's' ) {
488 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
489 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid map start: dropping incomplete table\n" );
490 uta_rt_drop( ctx->new_rtable );
493 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
494 ctx->new_rtable->mupdates = 0;
495 if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid_parse: meid map start found\n" );
497 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
498 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
499 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
500 fprintf( stderr, "[ERR] meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
501 uta_rt_drop( ctx->new_rtable );
502 ctx->new_rtable = NULL;
506 if( DEBUG ) fprintf( stderr, "[DBUG] meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
509 if( ctx->new_rtable ) {
510 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
511 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
512 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
513 ctx->new_rtable = NULL;
514 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of meid map noticed\n" );
517 fprintf( stderr, "[DBUG] old route table:\n" );
518 rt_stats( ctx->old_rtable );
519 fprintf( stderr, "[DBUG] new route table:\n" );
520 rt_stats( ctx->rtable );
523 if( DEBUG ) fprintf( stderr, "[DBUG] end of meid map noticed, but one was not started!\n" );
524 ctx->new_rtable = NULL;
532 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
533 if( DEBUG ) fprintf( stderr, "[DBUG] meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
537 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
538 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
539 fprintf( stderr, "[ERR] meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
542 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
543 ctx->new_rtable->mupdates++;
546 if( strcmp( tokens[0], "mme_del" ) == 0 ) {
548 fprintf( stderr, "[ERR] meid_parse: mme_del record didn't have enough tokens\n" );
551 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
552 ctx->new_rtable->mupdates++;
557 Parse a single record recevied from the route table generator, or read
558 from a static route table file. Start records cause a new table to
559 be started (if a partial table was received it is discarded. Table
560 entry records are added to the currenly 'in progress' table, and an
561 end record causes the in progress table to be finalised and the
562 currently active table is replaced.
564 The updated table will be activated when the *|end record is encountered.
565 However, to allow for a "double" update, where both the meid map and the
566 route table must be updated at the same time, the end indication on a
567 route table (new or update) may specifiy "hold" which indicates that meid
568 map entries are to follow and the updated route table should be held as
569 pending until the end of the meid map is received and validated.
571 CAUTION: we are assuming that there is a single route/meid map generator
572 and as such only one type of update is received at a time; in other
573 words, the sender cannot mix update records and if there is more than
574 one sender process they must synchronise to avoid issues.
577 For a RT update, we expect:
578 newrt|{start|end [hold]}
579 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
580 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
581 mse| <mtype>[,sender] | <sub-id> | %meid
584 For a meid map update we expect:
586 meid_map | end | <count> | <md5-hash>
587 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
588 mme_del | <meid0> <meid1>...<meidn>
591 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
593 int ntoks; // number of tokens found in something
595 int grp; // group number
596 rtable_ent_t* rte; // route table entry added
598 char* tok; // pointer into a token or string
604 while( *buf && isspace( *buf ) ) { // skip leading whitespace
607 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
610 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
611 tokens[0] = clip( tokens[0] );
612 switch( *(tokens[0]) ) {
613 case 0: // ignore blanks
615 case '#': // and comment lines
618 case 'd': // del | [sender,]mtype | sub-id
619 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
624 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
628 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
629 ctx->new_rtable->updates++;
632 case 'n': // newrt|{start|end}
633 tokens[1] = clip( tokens[1] );
634 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
635 if( ctx->new_rtable ) {
636 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
637 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
638 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
639 ctx->new_rtable = NULL;
640 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
643 fprintf( stderr, "[DBUG] old route table:\n" );
644 rt_stats( ctx->old_rtable );
645 fprintf( stderr, "[DBUG] new route table:\n" );
646 rt_stats( ctx->rtable );
649 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
650 ctx->new_rtable = NULL;
652 } else { // start a new table.
653 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
654 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
655 uta_rt_drop( ctx->new_rtable );
658 ctx->new_rtable = NULL;
659 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
660 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
664 case 'm': // mse entry or one of the meid_ records
665 if( strcmp( tokens[0], "mse" ) == 0 ) {
666 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
671 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
675 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
676 ctx->new_rtable->updates++;
678 meid_parser( ctx, tokens, ntoks, vlevel );
682 case 'r': // assume rt entry
683 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
687 ctx->new_rtable->updates++;
688 if( ntoks > 3 ) { // assume new entry with subid last
689 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
691 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
695 case 'u': // update current table, not a total replacement
696 tokens[1] = clip( tokens[1] );
697 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
698 if( ctx->new_rtable == NULL ) { // update table not in progress
703 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
704 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
705 ctx->new_rtable->updates, tokens[2] );
706 uta_rt_drop( ctx->new_rtable );
707 ctx->new_rtable = NULL;
712 if( ctx->new_rtable ) {
713 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
714 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
715 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
716 ctx->new_rtable = NULL;
717 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
720 fprintf( stderr, "[DBUG] old route table:\n" );
721 rt_stats( ctx->old_rtable );
722 fprintf( stderr, "[DBUG] updated route table:\n" );
723 rt_stats( ctx->rtable );
726 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
727 ctx->new_rtable = NULL;
729 } else { // start a new table.
730 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
731 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
732 uta_rt_drop( ctx->new_rtable );
735 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
736 ctx->new_rtable->updates = 0; // init count of updates received
737 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
742 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
749 This function attempts to open a static route table in order to create a 'seed'
750 table during initialisation. The environment variable RMR_SEED_RT is expected
751 to contain the necessary path to the file. If missing, or if the file is empty,
752 no route table will be available until one is received from the generator.
754 This function is probably most useful for testing situations, or extreme
755 cases where the routes are static.
757 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
760 char* fbuf; // buffer with file contents
761 char* rec; // start of the record
762 char* eor; // end of the record
763 int rcount = 0; // record count for debug
765 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
769 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
770 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
774 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
775 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
777 *eor = '\n'; // will look like a blank line which is ok
781 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
783 if( (eor = strchr( rec, '\n' )) != NULL ) {
786 fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
787 fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
792 parse_rt_rec( ctx, rec, vlevel );
795 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
800 Callback driven for each named thing in a symtab. We collect the pointers to those
801 things for later use (cloning).
803 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
806 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
810 if( thing == NULL ) {
814 tl->names[tl->nused] = name; // the name/key
815 tl->things[tl->nused++] = thing; // save a reference to the thing
819 Called to delete a route table entry struct. We delete the array of endpoint
820 pointers, but NOT the endpoints referenced as those are referenced from
823 Route table entries can be concurrently referenced by multiple symtabs, so
824 the actual delete happens only if decrementing the rte's ref count takes it
825 to 0. Thus, it is safe to call this function across a symtab when cleaning up
826 the symtab, or overlaying an entry.
828 This function uses ONLY the pointer to the rte (thing) and ignores the other
829 information that symtab foreach function passes (st, entry, and data) which
830 means that it _can_ safetly be used outside of the foreach setting. If
831 the function is changed to depend on any of these three, then a stand-alone
832 rte_cleanup() function should be added and referenced by this, and refererences
833 to this outside of the foreach world should be changed.
835 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
839 if( (rte = (rtable_ent_t *) thing) == NULL ) {
844 if( rte->refs > 0 ) { // something still referencing, so it lives
848 if( rte->rrgroups ) { // clean up the round robin groups
849 for( i = 0; i < rte->nrrgroups; i++ ) {
850 if( rte->rrgroups[i] ) {
851 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
855 free( rte->rrgroups );
858 free( rte ); // finally, drop the potato
862 Read an entire file into a buffer. We assume for route table files
863 they will be smallish and so this won't be a problem.
864 Returns a pointer to the buffer, or nil. Caller must free.
865 Terminates the buffer with a nil character for string processing.
867 If we cannot stat the file, we assume it's empty or missing and return
868 an empty buffer, as opposed to a nil, so the caller can generate defaults
869 or error if an empty/missing file isn't tolerated.
871 static char* uta_fib( char* fname ) {
873 off_t fsize = 8192; // size of the file
874 off_t nread; // number of bytes read
876 char* buf; // input buffer
878 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
879 if( fstat( fd, &stats ) >= 0 ) {
880 if( stats.st_size <= 0 ) { // empty file
884 fsize = stats.st_size; // stat ok, save the file size
887 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
891 if( fd < 0 ) { // didn't open or empty
892 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
900 // add a size limit check here
902 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
908 nread = read( fd, buf, fsize );
909 if( nread < 0 || nread > fsize ) { // failure of some kind
911 errno = EFBIG; // likely too much to handle
923 Create and initialise a route table; Returns a pointer to the table struct.
925 static route_table_t* uta_rt_init( ) {
928 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
932 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
941 Clones one of the spaces in the given table.
942 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
943 Space is the space in the old table to copy. Space 0 uses an integer key and
944 references rte structs. All other spaces use a string key and reference endpoints.
946 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
947 endpoint_t* ep; // an endpoint
948 rtable_ent_t* rte; // a route table entry
949 void* sst; // source symtab
950 void* nst; // new symtab
951 thing_list_t things; // things from the space to copy
955 if( nrt == NULL ) { // make a new table if needed
960 if( srt == NULL ) { // source was nil, just give back the new table
964 things.nalloc = 2048;
966 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
967 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
968 if( things.things == NULL ) {
978 sst = srt->hash; // convenience pointers (src symtab)
981 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
983 if( DEBUG ) fprintf( stderr, "[DBUG] clone space cloned %d things in space %d\n", things.nused, space );
984 for( i = 0; i < things.nused; i++ ) {
985 if( space ) { // string key, epoint reference
986 ep = (endpoint_t *) things.things[i];
987 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
989 rte = (rtable_ent_t *) things.things[i];
990 rte->refs++; // rtes can be removed, so we track references
991 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
995 free( things.things );
996 free( (void *) things.names );
1001 Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1002 The endpoint and meid entries in the hash must be preserved.
1004 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1005 endpoint_t* ep; // an endpoint
1006 rtable_ent_t* rte; // a route table entry
1007 route_table_t* nrt = NULL; // new route table
1011 return uta_rt_init(); // no source to clone, just return an empty table
1014 nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
1015 rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
1021 Creates a new route table and then clones _all_ of the given route table (references
1022 both endpoints AND the route table entries. Needed to support a partial update where
1023 some route table entries will not be deleted if not explicitly in the update and when
1024 we are adding/replacing meid references.
1026 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1027 endpoint_t* ep; // an endpoint
1028 rtable_ent_t* rte; // a route table entry
1029 route_table_t* nrt = NULL; // new route table
1033 return uta_rt_init(); // no source to clone, just return an empty table
1036 nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it
1037 rt_clone_space( srt, nrt, RT_NAME_SPACE );
1038 rt_clone_space( srt, nrt, RT_ME_SPACE );
1044 Given a name, find the endpoint struct in the provided route table.
1046 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1048 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1052 return rmr_sym_get( rt->hash, ep_name, 1 );
1056 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1058 static void uta_rt_drop( route_table_t* rt ) {
1063 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1064 rmr_sym_free( rt->hash ); // free all of the hash related data
1069 Look up and return the pointer to the endpoint stuct matching the given name.
1070 If not in the hash, a new endpoint is created, added to the hash. Should always
1073 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1076 if( !rt || !ep_name || ! *ep_name ) {
1077 fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1082 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1083 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1084 fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1089 ep->notify = 1; // show notification on first connection failure
1090 ep->open = 0; // not connected
1091 ep->addr = uta_h2ip( ep_name );
1092 ep->name = strdup( ep_name );
1093 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1094 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1096 rmr_sym_put( rt->hash, ep_name, 1, ep );
1104 Given a session id and message type build a key that can be used to look up the rte in the route
1105 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1107 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1110 if( sub_id == UNSET_SUBID ) {
1111 key = 0xffffffff00000000 | mtype;
1113 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);