3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
61 // ---- debugging/testing -------------------------------------------------------------------------
64 Dump some stats for an endpoint in the RT. This is generally called to
65 verify endpoints after a table load/change.
67 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
71 if( (ep = (endpoint_t *) thing) == NULL ) {
75 if( (counter = (int *) vcounter) != NULL ) {
79 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
83 Called to count meid entries in the table. The meid points to an 'owning' endpoint
84 so we can list what we find
86 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
90 if( (ep = (endpoint_t *) thing) == NULL ) {
94 if( (counter = (int *) vcounter) != NULL ) {
98 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
102 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
103 the 'source' information and is added to each message.
105 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
109 if( (ep = (endpoint_t *) thing) == NULL ) {
113 if( (id = (char *) vid) == NULL ) {
117 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
118 (long long) time( NULL ),
122 ep->scounts[EPSC_GOOD],
123 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
124 ep->scounts[EPSC_FAIL],
125 ep->scounts[EPSC_TRANS] );
129 Dump stats for a route entry in the table.
131 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
133 rtable_ent_t* rte; // thing is really an rte
137 if( (rte = (rtable_ent_t *) thing) == NULL ) {
141 if( (counter = (int *) vcounter) != NULL ) {
145 mtype = rte->key & 0xffff;
146 sid = (int) (rte->key >> 32);
148 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
152 Given a route table, cause some stats to be spit out.
154 static void rt_stats( route_table_t* rt ) {
158 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
162 counter = (int *) malloc( sizeof( int ) );
164 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
165 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
166 rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
167 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
169 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
171 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
172 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
174 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
176 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
177 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
183 Given a route table, cause endpoint counters to be written to stderr. The id
184 parm is written as the "source" in the output.
186 static void rt_epcounts( route_table_t* rt, char* id ) {
188 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
192 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
196 // ------------------------------------------------------------------------------------------------
198 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
199 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
201 static char* clip( char* buf ) {
204 while( *buf && isspace( *buf ) ) { // skip leading whitespace
208 if( (tok = strchr( buf, '#' )) != NULL ) {
210 return buf; // just push back; leading comment sym handled there
213 if( isspace( *(tok-1) ) ) {
218 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
225 This accepts a pointer to a nil terminated string, and ensures that there is a
226 newline as the last character. If there is not, a new buffer is allocated and
227 the newline is added. If a new buffer is allocated, the buffer passed in is
228 freed. The function returns a pointer which the caller should use, and must
229 free. In the event of an error, a nil pointer is returned.
231 static char* ensure_nlterm( char* buf ) {
236 if( buf == NULL || (len = strlen( buf )) < 2 ) {
237 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
242 if( buf[len-1] != '\n' ) {
243 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
244 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
245 memcpy( nb, buf, len );
246 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
258 Given a message type create a route table entry and add to the hash keyed on the
259 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
260 is the number of group slots to allocate in the entry.
262 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
264 rtable_ent_t* old_rte; // entry which was already in the table for the key
270 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
271 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
274 memset( rte, 0, sizeof( *rte ) );
278 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
283 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
287 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
289 rte->rrgroups = NULL;
292 rte->nrrgroups = nrrgroups;
294 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
295 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
298 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
300 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
305 This accepts partially parsed information from an rte or mse record sent by route manager or read from
307 ts_field is the msg-type,sender field
308 subid is the integer subscription id
309 rr_field is the endpoint information for round robening message over
311 If all goes well, this will add an RTE to the table under construction.
313 The ts_field is checked to see if we should ingest this record. We ingest if one of
315 there is no sender info (a generic entry for all)
316 there is sender and our host:port matches one of the senders
317 the sender info is an IP address that matches one of our IP addresses
319 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
320 rtable_ent_t* rte; // route table entry added
323 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
327 int ngtoks; // number of tokens in the group list
328 int grp; // index into group list
330 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
331 rr_field = clip( rr_field );
333 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
334 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
335 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
337 key = build_rt_key( subid, atoi( ts_field ) );
339 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
341 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
342 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
343 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
345 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
346 rte->mtype = atoi( ts_field ); // capture mtype for debugging
348 for( grp = 0; grp < ngtoks; grp++ ) {
349 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
350 for( i = 0; i < ntoks; i++ ) {
351 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
352 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
353 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
360 if( DEBUG || (vlevel > 2) ) {
361 rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
367 Trash_entry takes a partially parsed record from the input and
368 will delete the entry if the sender,mtype matches us or it's a
369 generic mtype. The refernce in the new table is removed and the
370 refcounter for the actual rte is decreased. If that ref count is
371 0 then the memory is freed (handled byh the del_rte call).
373 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
374 rtable_ent_t* rte; // route table entry to be 'deleted'
377 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
380 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
384 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
386 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
387 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
388 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
390 key = build_rt_key( subid, atoi( ts_field ) );
391 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
393 if( DEBUG || (vlevel > 1) ) {
394 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
396 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
397 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
399 if( DEBUG || (vlevel > 1) ) {
400 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
404 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
409 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
410 the 'owner' which should be the dns name or IP address of an enpoint
411 the meid_list is a space separated list of me IDs
413 This function assumes the caller has vetted the pointers as needed.
415 For each meid in the list, an entry is pushed into the hash which references the owner
416 endpoint such that when the meid is used to route a message it references the endpoint
419 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
425 endpoint_t* ep; // endpoint struct for the owner
427 owner = clip( owner ); // ditch extra whitespace and trailing comments
428 meid_list = clip( meid_list );
430 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
431 for( i = 0; i < ntoks; i++ ) {
432 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
433 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
434 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
436 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
442 Given the tokens from an mme_del, delete the listed meid entries from the new
443 table. The list is a space separated list of meids.
445 The meids in the hash reference endpoints which are never deleted and so
446 the only thing that we need to do here is to remove the meid from the hash.
448 This function assumes the caller has vetted the pointers as needed.
450 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
456 if( rtab->hash == NULL ) {
460 meid_list = clip( meid_list );
462 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
463 for( i = 0; i < ntoks; i++ ) {
464 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
465 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
470 Parse a partially parsed meid record. Tokens[0] should be one of:
471 meid_map, mme_ar, mme_del.
473 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
474 if( tokens == NULL || ntoks < 1 ) {
475 return; // silent but should never happen
478 if( ntoks < 2 ) { // must have at least two for any valid request record
479 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
483 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
484 tokens[1] = clip( tokens[1] );
485 if( *(tokens[1]) == 's' ) {
486 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
487 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
488 uta_rt_drop( ctx->new_rtable );
491 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
492 ctx->new_rtable->mupdates = 0;
493 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
495 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
496 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
497 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
498 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
499 uta_rt_drop( ctx->new_rtable );
500 ctx->new_rtable = NULL;
504 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
507 if( ctx->new_rtable ) {
508 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
509 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
510 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
511 ctx->new_rtable = NULL;
512 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
515 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
516 rt_stats( ctx->old_rtable );
517 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
518 rt_stats( ctx->rtable );
521 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
522 ctx->new_rtable = NULL;
530 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
531 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
535 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
536 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
537 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
540 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
541 ctx->new_rtable->mupdates++;
544 if( strcmp( tokens[0], "mme_del" ) == 0 ) {
546 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
549 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
550 ctx->new_rtable->mupdates++;
555 Parse a single record recevied from the route table generator, or read
556 from a static route table file. Start records cause a new table to
557 be started (if a partial table was received it is discarded. Table
558 entry records are added to the currenly 'in progress' table, and an
559 end record causes the in progress table to be finalised and the
560 currently active table is replaced.
562 The updated table will be activated when the *|end record is encountered.
563 However, to allow for a "double" update, where both the meid map and the
564 route table must be updated at the same time, the end indication on a
565 route table (new or update) may specifiy "hold" which indicates that meid
566 map entries are to follow and the updated route table should be held as
567 pending until the end of the meid map is received and validated.
569 CAUTION: we are assuming that there is a single route/meid map generator
570 and as such only one type of update is received at a time; in other
571 words, the sender cannot mix update records and if there is more than
572 one sender process they must synchronise to avoid issues.
575 For a RT update, we expect:
576 newrt|{start|end [hold]}
577 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
578 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
579 mse| <mtype>[,sender] | <sub-id> | %meid
582 For a meid map update we expect:
584 meid_map | end | <count> | <md5-hash>
585 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
586 mme_del | <meid0> <meid1>...<meidn>
589 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
591 int ntoks; // number of tokens found in something
593 int grp; // group number
594 rtable_ent_t* rte; // route table entry added
596 char* tok; // pointer into a token or string
602 while( *buf && isspace( *buf ) ) { // skip leading whitespace
605 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
608 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
609 tokens[0] = clip( tokens[0] );
610 switch( *(tokens[0]) ) {
611 case 0: // ignore blanks
613 case '#': // and comment lines
616 case 'd': // del | [sender,]mtype | sub-id
617 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
622 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
626 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
627 ctx->new_rtable->updates++;
630 case 'n': // newrt|{start|end}
631 tokens[1] = clip( tokens[1] );
632 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
633 if( ctx->new_rtable ) {
634 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
635 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
636 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
637 ctx->new_rtable = NULL;
638 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
641 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
642 rt_stats( ctx->old_rtable );
643 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
644 rt_stats( ctx->rtable );
647 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
648 ctx->new_rtable = NULL;
650 } else { // start a new table.
651 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
652 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
653 uta_rt_drop( ctx->new_rtable );
656 ctx->new_rtable = NULL;
657 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
658 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
662 case 'm': // mse entry or one of the meid_ records
663 if( strcmp( tokens[0], "mse" ) == 0 ) {
664 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
669 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
673 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
674 ctx->new_rtable->updates++;
676 meid_parser( ctx, tokens, ntoks, vlevel );
680 case 'r': // assume rt entry
681 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
685 ctx->new_rtable->updates++;
686 if( ntoks > 3 ) { // assume new entry with subid last
687 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
689 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
693 case 'u': // update current table, not a total replacement
694 tokens[1] = clip( tokens[1] );
695 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
696 if( ctx->new_rtable == NULL ) { // update table not in progress
701 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
702 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
703 ctx->new_rtable->updates, tokens[2] );
704 uta_rt_drop( ctx->new_rtable );
705 ctx->new_rtable = NULL;
710 if( ctx->new_rtable ) {
711 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
712 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
713 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
714 ctx->new_rtable = NULL;
715 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
718 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
719 rt_stats( ctx->old_rtable );
720 rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
721 rt_stats( ctx->rtable );
724 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
725 ctx->new_rtable = NULL;
727 } else { // start a new table.
728 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
729 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
730 uta_rt_drop( ctx->new_rtable );
733 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
734 ctx->new_rtable->updates = 0; // init count of updates received
735 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
740 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
747 This function attempts to open a static route table in order to create a 'seed'
748 table during initialisation. The environment variable RMR_SEED_RT is expected
749 to contain the necessary path to the file. If missing, or if the file is empty,
750 no route table will be available until one is received from the generator.
752 This function is probably most useful for testing situations, or extreme
753 cases where the routes are static.
755 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
758 char* fbuf; // buffer with file contents
759 char* rec; // start of the record
760 char* eor; // end of the record
761 int rcount = 0; // record count for debug
763 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
767 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
768 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
772 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
773 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
775 *eor = '\n'; // will look like a blank line which is ok
779 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
781 if( (eor = strchr( rec, '\n' )) != NULL ) {
784 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
785 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
790 parse_rt_rec( ctx, rec, vlevel );
793 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
798 Callback driven for each named thing in a symtab. We collect the pointers to those
799 things for later use (cloning).
801 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
804 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
808 if( thing == NULL ) {
812 tl->names[tl->nused] = name; // the name/key
813 tl->things[tl->nused++] = thing; // save a reference to the thing
817 Called to delete a route table entry struct. We delete the array of endpoint
818 pointers, but NOT the endpoints referenced as those are referenced from
821 Route table entries can be concurrently referenced by multiple symtabs, so
822 the actual delete happens only if decrementing the rte's ref count takes it
823 to 0. Thus, it is safe to call this function across a symtab when cleaning up
824 the symtab, or overlaying an entry.
826 This function uses ONLY the pointer to the rte (thing) and ignores the other
827 information that symtab foreach function passes (st, entry, and data) which
828 means that it _can_ safetly be used outside of the foreach setting. If
829 the function is changed to depend on any of these three, then a stand-alone
830 rte_cleanup() function should be added and referenced by this, and refererences
831 to this outside of the foreach world should be changed.
833 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
837 if( (rte = (rtable_ent_t *) thing) == NULL ) {
842 if( rte->refs > 0 ) { // something still referencing, so it lives
846 if( rte->rrgroups ) { // clean up the round robin groups
847 for( i = 0; i < rte->nrrgroups; i++ ) {
848 if( rte->rrgroups[i] ) {
849 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
853 free( rte->rrgroups );
856 free( rte ); // finally, drop the potato
860 Read an entire file into a buffer. We assume for route table files
861 they will be smallish and so this won't be a problem.
862 Returns a pointer to the buffer, or nil. Caller must free.
863 Terminates the buffer with a nil character for string processing.
865 If we cannot stat the file, we assume it's empty or missing and return
866 an empty buffer, as opposed to a nil, so the caller can generate defaults
867 or error if an empty/missing file isn't tolerated.
869 static char* uta_fib( char* fname ) {
871 off_t fsize = 8192; // size of the file
872 off_t nread; // number of bytes read
874 char* buf; // input buffer
876 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
877 if( fstat( fd, &stats ) >= 0 ) {
878 if( stats.st_size <= 0 ) { // empty file
882 fsize = stats.st_size; // stat ok, save the file size
885 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
889 if( fd < 0 ) { // didn't open or empty
890 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
898 // add a size limit check here
900 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
906 nread = read( fd, buf, fsize );
907 if( nread < 0 || nread > fsize ) { // failure of some kind
909 errno = EFBIG; // likely too much to handle
921 Create and initialise a route table; Returns a pointer to the table struct.
923 static route_table_t* uta_rt_init( ) {
926 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
930 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
939 Clones one of the spaces in the given table.
940 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
941 Space is the space in the old table to copy. Space 0 uses an integer key and
942 references rte structs. All other spaces use a string key and reference endpoints.
944 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
945 endpoint_t* ep; // an endpoint
946 rtable_ent_t* rte; // a route table entry
947 void* sst; // source symtab
948 void* nst; // new symtab
949 thing_list_t things; // things from the space to copy
953 if( nrt == NULL ) { // make a new table if needed
958 if( srt == NULL ) { // source was nil, just give back the new table
962 things.nalloc = 2048;
964 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
965 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
966 if( things.things == NULL ) {
976 sst = srt->hash; // convenience pointers (src symtab)
979 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
981 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
982 for( i = 0; i < things.nused; i++ ) {
983 if( space ) { // string key, epoint reference
984 ep = (endpoint_t *) things.things[i];
985 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
987 rte = (rtable_ent_t *) things.things[i];
988 rte->refs++; // rtes can be removed, so we track references
989 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
993 free( things.things );
994 free( (void *) things.names );
999 Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1000 The endpoint and meid entries in the hash must be preserved.
1002 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1003 endpoint_t* ep; // an endpoint
1004 rtable_ent_t* rte; // a route table entry
1005 route_table_t* nrt = NULL; // new route table
1009 return uta_rt_init(); // no source to clone, just return an empty table
1012 nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
1013 rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
1019 Creates a new route table and then clones _all_ of the given route table (references
1020 both endpoints AND the route table entries. Needed to support a partial update where
1021 some route table entries will not be deleted if not explicitly in the update and when
1022 we are adding/replacing meid references.
1024 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1025 endpoint_t* ep; // an endpoint
1026 rtable_ent_t* rte; // a route table entry
1027 route_table_t* nrt = NULL; // new route table
1031 return uta_rt_init(); // no source to clone, just return an empty table
1034 nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it
1035 rt_clone_space( srt, nrt, RT_NAME_SPACE );
1036 rt_clone_space( srt, nrt, RT_ME_SPACE );
1042 Given a name, find the endpoint struct in the provided route table.
1044 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1046 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1050 return rmr_sym_get( rt->hash, ep_name, 1 );
1054 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1056 static void uta_rt_drop( route_table_t* rt ) {
1061 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1062 rmr_sym_free( rt->hash ); // free all of the hash related data
1067 Look up and return the pointer to the endpoint stuct matching the given name.
1068 If not in the hash, a new endpoint is created, added to the hash. Should always
1071 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1074 if( !rt || !ep_name || ! *ep_name ) {
1075 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1080 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1081 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1082 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1087 ep->notify = 1; // show notification on first connection failure
1088 ep->open = 0; // not connected
1089 ep->addr = uta_h2ip( ep_name );
1090 ep->name = strdup( ep_name );
1091 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1092 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1094 rmr_sym_put( rt->hash, ep_name, 1, ep );
1102 Given a session id and message type build a key that can be used to look up the rte in the route
1103 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1105 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1108 if( sub_id == UNSET_SUBID ) {
1109 key = 0xffffffff00000000 | mtype;
1111 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);