3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ---- debugging/testing -------------------------------------------------------------------------
63 Dump stats for an endpoint in the RT.
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
69 if( (ep = (endpoint_t *) thing) == NULL ) {
73 if( (counter = (int *) vcounter) != NULL ) {
77 fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
81 Dump stats for a route entry in the table.
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
85 rtable_ent_t* rte; // thing is really an rte
89 if( (rte = (rtable_ent_t *) thing) == NULL ) {
93 if( (counter = (int *) vcounter) != NULL ) {
97 mtype = rte->key & 0xffff;
98 sid = (int) (rte->key >> 32);
100 fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
104 Given a route table, cause some stats to be spit out.
106 static void rt_stats( route_table_t* rt ) {
110 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
114 counter = (int *) malloc( sizeof( int ) );
116 fprintf( stderr, "[DBUG] rtstats:\n" );
117 rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
118 fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
121 rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
122 fprintf( stderr, "[DBUG] %d entries\n", *counter );
128 // ------------------------------------------------------------------------------------------------
130 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
133 static char* clip( char* buf ) {
136 while( *buf && isspace( *buf ) ) { // skip leading whitespace
140 if( (tok = strchr( buf, '#' )) != NULL ) {
142 return buf; // just push back; leading comment sym handled there
145 if( isspace( *(tok-1) ) ) {
150 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
157 This accepts a pointer to a nil terminated string, and ensures that there is a
158 newline as the last character. If there is not, a new buffer is allocated and
159 the newline is added. If a new buffer is allocated, the buffer passed in is
160 freed. The function returns a pointer which the caller should use, and must
161 free. In the event of an error, a nil pointer is returned.
163 static char* ensure_nlterm( char* buf ) {
169 if( buf == NULL || (len = strlen( buf )) < 2 ) {
170 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
175 if( buf[len-1] != '\n' ) {
176 fprintf( stderr, "[WARN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
177 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
178 memcpy( nb, buf, len );
179 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
190 Given a message type create a route table entry and add to the hash keyed on the
191 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
192 is the number of group slots to allocate in the entry.
194 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
196 rtable_ent_t* old_rte; // entry which was already in the table for the key
202 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
203 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
206 memset( rte, 0, sizeof( *rte ) );
210 if( nrrgroups <= 0 ) {
214 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
215 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
219 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
220 rte->nrrgroups = nrrgroups;
222 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
223 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
226 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
228 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
233 This accepts partially parsed information from a record sent by route manager or read from
235 ts_field is the msg-type,sender field
236 subid is the integer subscription id
237 rr_field is the endpoint information for round robening message over
239 If all goes well, this will add an RTE to the table under construction.
241 The ts_field is checked to see if we should ingest this record. We ingest if one of
243 there is no sender info (a generic entry for all)
244 there is sender and our host:port matches one of the senders
245 the sender info is an IP address that matches one of our IP addresses
247 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
248 rtable_ent_t* rte; // route table entry added
251 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
255 int ngtoks; // number of tokens in the group list
256 int grp; // index into group list
258 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
259 rr_field = clip( rr_field );
261 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
262 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
263 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
265 key = build_rt_key( subid, atoi( ts_field ) );
267 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
269 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
270 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
272 for( grp = 0; grp < ngtoks; grp++ ) {
273 if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
274 for( i = 0; i < ntoks; i++ ) {
275 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field );
276 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
282 if( DEBUG || (vlevel > 2) )
283 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
288 Trash_entry takes a partially parsed record from the input and
289 will delete the entry if the sender,mtype matches us or it's a
290 generic mtype. The refernce in the new table is removed and the
291 refcounter for the actual rte is decreased. If that ref count is
292 0 then the memory is freed (handled byh the del_rte call).
294 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
295 rtable_ent_t* rte; // route table entry to be 'deleted'
298 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
301 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
305 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
307 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
308 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
309 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
311 key = build_rt_key( subid, atoi( ts_field ) );
312 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
314 if( DEBUG || (vlevel > 1) ) {
315 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
317 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
318 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
320 if( DEBUG || (vlevel > 1) ) {
321 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
325 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
330 Parse a single record recevied from the route table generator, or read
331 from a static route table file. Start records cause a new table to
332 be started (if a partial table was received it is discarded. Table
333 entry records are added to the currenly 'in progress' table, and an
334 end record causes the in progress table to be finalised and the
335 currently active table is replaced.
337 We expect one of several types:
339 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
340 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
342 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
344 int ntoks; // number of tokens found in something
346 int grp; // group number
347 rtable_ent_t* rte; // route table entry added
349 char* gtokens[64]; // groups
350 char* tok; // pointer into a token or string
356 while( *buf && isspace( *buf ) ) { // skip leading whitespace
359 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
362 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
363 switch( *(tokens[0]) ) {
364 case 0: // ignore blanks
366 case '#': // and comment lines
369 case 'd': // del | [sender,]mtype | sub-id
370 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
375 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
379 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
380 ctx->new_rtable->updates++;
383 case 'n': // newrt|{start|end}
384 tokens[1] = clip( tokens[1] );
385 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
386 if( ctx->new_rtable ) {
387 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
388 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
389 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
390 ctx->new_rtable = NULL;
391 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
394 fprintf( stderr, "[DBUG] old route table:\n" );
395 rt_stats( ctx->old_rtable );
396 fprintf( stderr, "[DBUG] new route table:\n" );
397 rt_stats( ctx->rtable );
400 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
401 ctx->new_rtable = NULL;
403 } else { // start a new table.
404 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
405 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
406 uta_rt_drop( ctx->new_rtable );
410 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
412 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
414 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
418 case 'm': // assume mse entry
419 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
424 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
428 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
429 ctx->new_rtable->updates++;
432 case 'r': // assume rt entry
433 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
437 ctx->new_rtable->updates++;
438 if( ntoks > 3 ) { // assume new entry with subid last
439 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
441 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
445 case 'u': // update current table, not a total replacement
446 tokens[1] = clip( tokens[1] );
447 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
448 if( ctx->new_rtable == NULL ) { // update table not in progress
453 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
454 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
455 ctx->new_rtable->updates, tokens[2] );
456 uta_rt_drop( ctx->new_rtable );
457 ctx->new_rtable = NULL;
462 if( ctx->new_rtable ) {
463 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
464 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
465 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
466 ctx->new_rtable = NULL;
467 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
470 fprintf( stderr, "[DBUG] old route table:\n" );
471 rt_stats( ctx->old_rtable );
472 fprintf( stderr, "[DBUG] updated route table:\n" );
473 rt_stats( ctx->rtable );
476 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
477 ctx->new_rtable = NULL;
479 } else { // start a new table.
480 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
481 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
482 uta_rt_drop( ctx->new_rtable );
486 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
488 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
491 ctx->new_rtable->updates = 0; // init count of updates received
492 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
497 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
504 This function attempts to open a static route table in order to create a 'seed'
505 table during initialisation. The environment variable RMR_SEED_RT is expected
506 to contain the necessary path to the file. If missing, or if the file is empty,
507 no route table will be available until one is received from the generator.
509 This function is probably most useful for testing situations, or extreme
510 cases where the routes are static.
512 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
515 char* fbuf; // buffer with file contents
516 char* rec; // start of the record
517 char* eor; // end of the record
518 int rcount = 0; // record count for debug
520 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
524 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
525 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
529 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
530 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
532 *eor = '\n'; // will look like a blank line which is ok
536 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
538 if( (eor = strchr( rec, '\n' )) != NULL ) {
541 fprintf( stderr, "[WARN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
542 fprintf( stderr, "[WARN] rmr read_static: seed route table not used: %s\n", fname );
547 parse_rt_rec( ctx, rec, vlevel );
550 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount );
555 Callback driven for each named thing in a symtab. We collect the pointers to those
556 things for later use (cloning).
558 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
561 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
565 if( thing == NULL ) {
569 tl->things[tl->nused++] = thing; // save a reference to the thing
573 Called to delete a route table entry struct. We delete the array of endpoint
574 pointers, but NOT the endpoints referenced as those are referenced from
577 Route table entries can be concurrently referenced by multiple symtabs, so
578 the actual delete happens only if decrementing the rte's ref count takes it
579 to 0. Thus, it is safe to call this function across a symtab when cleaning up
580 the symtab, or overlaying an entry.
582 This function uses ONLY the pointer to the rte (thing) and ignores the other
583 information that symtab foreach function passes (st, entry, and data) which
584 means that it _can_ safetly be used outside of the foreach setting. If
585 the function is changed to depend on any of these three, then a stand-alone
586 rte_cleanup() function should be added and referenced by this, and refererences
587 to this outside of the foreach world should be changed.
589 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
593 if( (rte = (rtable_ent_t *) thing) == NULL ) {
598 if( rte->refs > 0 ) { // something still referencing, so it lives
602 if( rte->rrgroups ) { // clean up the round robin groups
603 for( i = 0; i < rte->nrrgroups; i++ ) {
604 if( rte->rrgroups[i] ) {
605 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
609 free( rte->rrgroups );
612 free( rte ); // finally, drop the potato
616 Read an entire file into a buffer. We assume for route table files
617 they will be smallish and so this won't be a problem.
618 Returns a pointer to the buffer, or nil. Caller must free.
619 Terminates the buffer with a nil character for string processing.
621 If we cannot stat the file, we assume it's empty or missing and return
622 an empty buffer, as opposed to a nil, so the caller can generate defaults
623 or error if an empty/missing file isn't tolerated.
625 static char* uta_fib( char* fname ) {
627 off_t fsize = 8192; // size of the file
628 off_t nread; // number of bytes read
630 char* buf; // input buffer
632 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
633 if( fstat( fd, &stats ) >= 0 ) {
634 if( stats.st_size <= 0 ) { // empty file
638 fsize = stats.st_size; // stat ok, save the file size
641 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
645 if( fd < 0 ) { // didn't open or empty
646 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
654 // add a size limit check here
656 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
662 nread = read( fd, buf, fsize );
663 if( nread < 0 || nread > fsize ) { // failure of some kind
665 errno = EFBIG; // likely too much to handle
677 Create and initialise a route table; Returns a pointer to the table struct.
679 static route_table_t* uta_rt_init( ) {
682 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
686 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
695 Clone (sort of) an existing route table. This is done to preserve the endpoint
696 names referenced in a table (and thus existing sessions) when a new set
697 of message type to endpoint name mappings is received. A new route table
698 with only endpoint name references is returned based on the active table in
701 static route_table_t* uta_rt_clone( route_table_t* srt ) {
702 endpoint_t* ep; // an endpoint
703 route_table_t* nrt; // new route table
704 void* sst; // source symtab
705 void* nst; // new symtab
713 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
717 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
722 things.nalloc = 2048;
724 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
725 if( things.things == NULL ) {
731 sst = srt->hash; // convenience pointers (src symtab)
734 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
736 for( i = 0; i < things.nused; i++ ) {
737 ep = (endpoint_t *) things.things[i];
738 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
741 free( things.things );
746 Clones _all_ of the given route table (references both endpoints AND the route table
747 entries. Needed to support a partial update where some route table entries will not
748 be deleted if not explicitly in the update.
750 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
751 endpoint_t* ep; // an endpoint
752 rtable_ent_t* rte; // a route table entry
753 route_table_t* nrt; // new route table
754 void* sst; // source symtab
755 void* nst; // new symtab
756 thing_list_t things0; // things from space 0 (table entries)
757 thing_list_t things1; // things from space 1 (end points)
764 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
768 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
773 things0.nalloc = 2048;
775 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
776 if( things0.things == NULL ) {
782 things1.nalloc = 2048;
784 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
785 if( things1.things == NULL ) {
791 sst = srt->hash; // convenience pointers (src symtab)
794 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
795 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
797 for( i = 0; i < things0.nused; i++ ) {
798 rte = (rtable_ent_t *) things0.things[i];
799 rte->refs++; // rtes can be removed, so we track references
800 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
803 for( i = 0; i < things1.nused; i++ ) {
804 ep = (endpoint_t *) things1.things[i];
805 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
808 free( things0.things );
809 free( things1.things );
814 Given a name, find the endpoint struct in the provided route table.
816 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
818 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
822 return rmr_sym_get( rt->hash, ep_name, 1 );
826 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
828 static void uta_rt_drop( route_table_t* rt ) {
833 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
834 rmr_sym_free( rt->hash ); // free all of the hash related data
839 Look up and return the pointer to the endpoint stuct matching the given name.
840 If not in the hash, a new endpoint is created, added to the hash. Should always
843 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
846 if( !rt || !ep_name || ! *ep_name ) {
847 fprintf( stderr, "[WARN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
852 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
853 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
854 fprintf( stderr, "[WARN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
859 ep->open = 0; // not connected
860 ep->addr = uta_h2ip( ep_name );
861 ep->name = strdup( ep_name );
862 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
864 rmr_sym_put( rt->hash, ep_name, 1, ep );
872 Given a session id and message type build a key that can be used to look up the rte in the route
873 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
875 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
878 if( sub_id == UNSET_SUBID ) {
879 key = 0xffffffff00000000 | mtype;
881 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);