3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ---- debugging/testing -------------------------------------------------------------------------
63 Dump some stats for an endpoint in the RT. This is generally called to
64 verify endpoints after a table load/change.
66 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
70 if( (ep = (endpoint_t *) thing) == NULL ) {
74 if( (counter = (int *) vcounter) != NULL ) {
78 fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open );
82 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
83 the 'source' information and is added to each message.
85 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
89 if( (ep = (endpoint_t *) thing) == NULL ) {
93 if( (id = (char *) vid) == NULL ) {
97 fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
98 (long long) time( NULL ),
102 ep->scounts[EPSC_GOOD],
103 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
104 ep->scounts[EPSC_FAIL],
105 ep->scounts[EPSC_TRANS] );
109 Dump stats for a route entry in the table.
111 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
113 rtable_ent_t* rte; // thing is really an rte
117 if( (rte = (rtable_ent_t *) thing) == NULL ) {
121 if( (counter = (int *) vcounter) != NULL ) {
125 mtype = rte->key & 0xffff;
126 sid = (int) (rte->key >> 32);
128 fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
132 Given a route table, cause some stats to be spit out.
134 static void rt_stats( route_table_t* rt ) {
138 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
142 counter = (int *) malloc( sizeof( int ) );
144 fprintf( stderr, "[DBUG] rtstats:\n" );
145 rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
146 fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
149 rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
150 fprintf( stderr, "[DBUG] %d entries\n", *counter );
156 Given a route table, cause endpoint counters to be written to stderr. The id
157 parm is written as the "source" in the output.
159 static void rt_epcounts( route_table_t* rt, char* id ) {
161 fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
165 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
169 // ------------------------------------------------------------------------------------------------
171 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
172 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
174 static char* clip( char* buf ) {
177 while( *buf && isspace( *buf ) ) { // skip leading whitespace
181 if( (tok = strchr( buf, '#' )) != NULL ) {
183 return buf; // just push back; leading comment sym handled there
186 if( isspace( *(tok-1) ) ) {
191 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
198 This accepts a pointer to a nil terminated string, and ensures that there is a
199 newline as the last character. If there is not, a new buffer is allocated and
200 the newline is added. If a new buffer is allocated, the buffer passed in is
201 freed. The function returns a pointer which the caller should use, and must
202 free. In the event of an error, a nil pointer is returned.
204 static char* ensure_nlterm( char* buf ) {
210 if( buf == NULL || (len = strlen( buf )) < 2 ) {
211 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
216 if( buf[len-1] != '\n' ) {
217 fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
218 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
219 memcpy( nb, buf, len );
220 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
232 Given a message type create a route table entry and add to the hash keyed on the
233 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
234 is the number of group slots to allocate in the entry.
236 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
238 rtable_ent_t* old_rte; // entry which was already in the table for the key
244 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
245 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
248 memset( rte, 0, sizeof( *rte ) );
252 if( nrrgroups <= 0 ) {
256 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
257 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
261 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
262 rte->nrrgroups = nrrgroups;
264 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
265 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
268 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
270 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
275 This accepts partially parsed information from a record sent by route manager or read from
277 ts_field is the msg-type,sender field
278 subid is the integer subscription id
279 rr_field is the endpoint information for round robening message over
281 If all goes well, this will add an RTE to the table under construction.
283 The ts_field is checked to see if we should ingest this record. We ingest if one of
285 there is no sender info (a generic entry for all)
286 there is sender and our host:port matches one of the senders
287 the sender info is an IP address that matches one of our IP addresses
289 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
290 rtable_ent_t* rte; // route table entry added
293 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
297 int ngtoks; // number of tokens in the group list
298 int grp; // index into group list
300 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
301 rr_field = clip( rr_field );
303 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
304 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
305 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
307 key = build_rt_key( subid, atoi( ts_field ) );
309 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
311 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
312 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
314 for( grp = 0; grp < ngtoks; grp++ ) {
315 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
316 for( i = 0; i < ntoks; i++ ) {
317 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
318 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] );
319 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
326 if( DEBUG || (vlevel > 2) ) {
327 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
333 Trash_entry takes a partially parsed record from the input and
334 will delete the entry if the sender,mtype matches us or it's a
335 generic mtype. The refernce in the new table is removed and the
336 refcounter for the actual rte is decreased. If that ref count is
337 0 then the memory is freed (handled byh the del_rte call).
339 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
340 rtable_ent_t* rte; // route table entry to be 'deleted'
343 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
346 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
350 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
352 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
353 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
354 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
356 key = build_rt_key( subid, atoi( ts_field ) );
357 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
359 if( DEBUG || (vlevel > 1) ) {
360 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
362 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
363 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
365 if( DEBUG || (vlevel > 1) ) {
366 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
370 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
375 Parse a single record recevied from the route table generator, or read
376 from a static route table file. Start records cause a new table to
377 be started (if a partial table was received it is discarded. Table
378 entry records are added to the currenly 'in progress' table, and an
379 end record causes the in progress table to be finalised and the
380 currently active table is replaced.
382 We expect one of several types:
384 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
385 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
387 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
389 int ntoks; // number of tokens found in something
391 int grp; // group number
392 rtable_ent_t* rte; // route table entry added
394 char* gtokens[64]; // groups
395 char* tok; // pointer into a token or string
401 while( *buf && isspace( *buf ) ) { // skip leading whitespace
404 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
407 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
408 switch( *(tokens[0]) ) {
409 case 0: // ignore blanks
411 case '#': // and comment lines
414 case 'd': // del | [sender,]mtype | sub-id
415 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
420 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
424 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
425 ctx->new_rtable->updates++;
428 case 'n': // newrt|{start|end}
429 tokens[1] = clip( tokens[1] );
430 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
431 if( ctx->new_rtable ) {
432 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
433 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
434 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
435 ctx->new_rtable = NULL;
436 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
439 fprintf( stderr, "[DBUG] old route table:\n" );
440 rt_stats( ctx->old_rtable );
441 fprintf( stderr, "[DBUG] new route table:\n" );
442 rt_stats( ctx->rtable );
445 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
446 ctx->new_rtable = NULL;
448 } else { // start a new table.
449 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
450 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
451 uta_rt_drop( ctx->new_rtable );
455 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
457 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
459 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
463 case 'm': // assume mse entry
464 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
469 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
473 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
474 ctx->new_rtable->updates++;
477 case 'r': // assume rt entry
478 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
482 ctx->new_rtable->updates++;
483 if( ntoks > 3 ) { // assume new entry with subid last
484 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
486 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
490 case 'u': // update current table, not a total replacement
491 tokens[1] = clip( tokens[1] );
492 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
493 if( ctx->new_rtable == NULL ) { // update table not in progress
498 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
499 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
500 ctx->new_rtable->updates, tokens[2] );
501 uta_rt_drop( ctx->new_rtable );
502 ctx->new_rtable = NULL;
507 if( ctx->new_rtable ) {
508 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
509 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
510 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
511 ctx->new_rtable = NULL;
512 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
515 fprintf( stderr, "[DBUG] old route table:\n" );
516 rt_stats( ctx->old_rtable );
517 fprintf( stderr, "[DBUG] updated route table:\n" );
518 rt_stats( ctx->rtable );
521 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
522 ctx->new_rtable = NULL;
524 } else { // start a new table.
525 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
526 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
527 uta_rt_drop( ctx->new_rtable );
531 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
533 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
536 ctx->new_rtable->updates = 0; // init count of updates received
537 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
542 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
549 This function attempts to open a static route table in order to create a 'seed'
550 table during initialisation. The environment variable RMR_SEED_RT is expected
551 to contain the necessary path to the file. If missing, or if the file is empty,
552 no route table will be available until one is received from the generator.
554 This function is probably most useful for testing situations, or extreme
555 cases where the routes are static.
557 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
560 char* fbuf; // buffer with file contents
561 char* rec; // start of the record
562 char* eor; // end of the record
563 int rcount = 0; // record count for debug
565 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
569 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
570 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
574 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
575 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
577 *eor = '\n'; // will look like a blank line which is ok
581 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
583 if( (eor = strchr( rec, '\n' )) != NULL ) {
586 fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
587 fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
592 parse_rt_rec( ctx, rec, vlevel );
595 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount );
600 Callback driven for each named thing in a symtab. We collect the pointers to those
601 things for later use (cloning).
603 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
606 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
610 if( thing == NULL ) {
614 tl->things[tl->nused++] = thing; // save a reference to the thing
618 Called to delete a route table entry struct. We delete the array of endpoint
619 pointers, but NOT the endpoints referenced as those are referenced from
622 Route table entries can be concurrently referenced by multiple symtabs, so
623 the actual delete happens only if decrementing the rte's ref count takes it
624 to 0. Thus, it is safe to call this function across a symtab when cleaning up
625 the symtab, or overlaying an entry.
627 This function uses ONLY the pointer to the rte (thing) and ignores the other
628 information that symtab foreach function passes (st, entry, and data) which
629 means that it _can_ safetly be used outside of the foreach setting. If
630 the function is changed to depend on any of these three, then a stand-alone
631 rte_cleanup() function should be added and referenced by this, and refererences
632 to this outside of the foreach world should be changed.
634 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
638 if( (rte = (rtable_ent_t *) thing) == NULL ) {
643 if( rte->refs > 0 ) { // something still referencing, so it lives
647 if( rte->rrgroups ) { // clean up the round robin groups
648 for( i = 0; i < rte->nrrgroups; i++ ) {
649 if( rte->rrgroups[i] ) {
650 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
654 free( rte->rrgroups );
657 free( rte ); // finally, drop the potato
661 Read an entire file into a buffer. We assume for route table files
662 they will be smallish and so this won't be a problem.
663 Returns a pointer to the buffer, or nil. Caller must free.
664 Terminates the buffer with a nil character for string processing.
666 If we cannot stat the file, we assume it's empty or missing and return
667 an empty buffer, as opposed to a nil, so the caller can generate defaults
668 or error if an empty/missing file isn't tolerated.
670 static char* uta_fib( char* fname ) {
672 off_t fsize = 8192; // size of the file
673 off_t nread; // number of bytes read
675 char* buf; // input buffer
677 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
678 if( fstat( fd, &stats ) >= 0 ) {
679 if( stats.st_size <= 0 ) { // empty file
683 fsize = stats.st_size; // stat ok, save the file size
686 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
690 if( fd < 0 ) { // didn't open or empty
691 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
699 // add a size limit check here
701 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
707 nread = read( fd, buf, fsize );
708 if( nread < 0 || nread > fsize ) { // failure of some kind
710 errno = EFBIG; // likely too much to handle
722 Create and initialise a route table; Returns a pointer to the table struct.
724 static route_table_t* uta_rt_init( ) {
727 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
731 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
740 Clone (sort of) an existing route table. This is done to preserve the endpoint
741 names referenced in a table (and thus existing sessions) when a new set
742 of message type to endpoint name mappings is received. A new route table
743 with only endpoint name references is returned based on the active table in
746 static route_table_t* uta_rt_clone( route_table_t* srt ) {
747 endpoint_t* ep; // an endpoint
748 route_table_t* nrt; // new route table
749 void* sst; // source symtab
750 void* nst; // new symtab
758 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
762 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
767 things.nalloc = 2048;
769 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
770 if( things.things == NULL ) {
776 sst = srt->hash; // convenience pointers (src symtab)
779 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
781 for( i = 0; i < things.nused; i++ ) {
782 ep = (endpoint_t *) things.things[i];
783 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
786 free( things.things );
791 Clones _all_ of the given route table (references both endpoints AND the route table
792 entries. Needed to support a partial update where some route table entries will not
793 be deleted if not explicitly in the update.
795 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
796 endpoint_t* ep; // an endpoint
797 rtable_ent_t* rte; // a route table entry
798 route_table_t* nrt; // new route table
799 void* sst; // source symtab
800 void* nst; // new symtab
801 thing_list_t things0; // things from space 0 (table entries)
802 thing_list_t things1; // things from space 1 (end points)
809 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
813 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
818 things0.nalloc = 2048;
820 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
821 if( things0.things == NULL ) {
827 things1.nalloc = 2048;
829 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
830 if( things1.things == NULL ) {
836 sst = srt->hash; // convenience pointers (src symtab)
839 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
840 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
842 for( i = 0; i < things0.nused; i++ ) {
843 rte = (rtable_ent_t *) things0.things[i];
844 rte->refs++; // rtes can be removed, so we track references
845 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
848 for( i = 0; i < things1.nused; i++ ) {
849 ep = (endpoint_t *) things1.things[i];
850 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
853 free( things0.things );
854 free( things1.things );
859 Given a name, find the endpoint struct in the provided route table.
861 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
863 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
867 return rmr_sym_get( rt->hash, ep_name, 1 );
871 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
873 static void uta_rt_drop( route_table_t* rt ) {
878 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
879 rmr_sym_free( rt->hash ); // free all of the hash related data
884 Look up and return the pointer to the endpoint stuct matching the given name.
885 If not in the hash, a new endpoint is created, added to the hash. Should always
888 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
891 if( !rt || !ep_name || ! *ep_name ) {
892 fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
897 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
898 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
899 fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
904 ep->open = 0; // not connected
905 ep->addr = uta_h2ip( ep_name );
906 ep->name = strdup( ep_name );
907 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
908 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
910 rmr_sym_put( rt->hash, ep_name, 1, ep );
918 Given a session id and message type build a key that can be used to look up the rte in the route
919 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
921 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
924 if( sub_id == UNSET_SUBID ) {
925 key = 0xffffffff00000000 | mtype;
927 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);