3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ---- debugging/testing -------------------------------------------------------------------------
63 Dump stats for an endpoint in the RT.
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
69 if( (ep = (endpoint_t *) thing) == NULL ) {
73 if( (counter = (int *) vcounter) != NULL ) {
77 fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
81 Dump stats for a route entry in the table.
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
85 rtable_ent_t* rte; // thing is really an rte
89 if( (rte = (rtable_ent_t *) thing) == NULL ) {
93 if( (counter = (int *) vcounter) != NULL ) {
97 mtype = rte->key & 0xffff;
98 sid = (int) (rte->key >> 32);
100 fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
104 Given a route table, cause some stats to be spit out.
106 static void rt_stats( route_table_t* rt ) {
110 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
114 counter = (int *) malloc( sizeof( int ) );
116 fprintf( stderr, "[DBUG] rtstats:\n" );
117 rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
118 fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
121 rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
122 fprintf( stderr, "[DBUG] %d entries\n", *counter );
128 // ------------------------------------------------------------------------------------------------
130 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
133 static char* clip( char* buf ) {
136 while( *buf && isspace( *buf ) ) { // skip leading whitespace
140 if( (tok = strchr( buf, '#' )) != NULL ) {
142 return buf; // just push back; leading comment sym handled there
145 if( isspace( *(tok-1) ) ) {
150 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
157 This accepts a pointer to a nil terminated string, and ensures that there is a
158 newline as the last character. If there is not, a new buffer is allocated and
159 the newline is added. If a new buffer is allocated, the buffer passed in is
160 freed. The function returns a pointer which the caller should use, and must
161 free. In the event of an error, a nil pointer is returned.
163 static char* ensure_nlterm( char* buf ) {
169 if( buf == NULL || (len = strlen( buf )) < 2 ) {
170 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
175 if( buf[len-1] != '\n' ) {
176 fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
177 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
178 memcpy( nb, buf, len );
179 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
190 Given a message type create a route table entry and add to the hash keyed on the
191 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
192 is the number of group slots to allocate in the entry.
194 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
196 rtable_ent_t* old_rte; // entry which was already in the table for the key
202 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
203 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
206 memset( rte, 0, sizeof( *rte ) );
210 if( nrrgroups <= 0 ) {
214 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
215 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
219 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
220 rte->nrrgroups = nrrgroups;
222 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
223 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
226 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
228 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
233 This accepts partially parsed information from a record sent by route manager or read from
235 ts_field is the msg-type,sender field
236 subid is the integer subscription id
237 rr_field is the endpoint information for round robening message over
239 If all goes well, this will add an RTE to the table under construction.
241 The ts_field is checked to see if we should ingest this record. We ingest if one of
243 there is no sender info (a generic entry for all)
244 there is sender and our host:port matches one of the senders
245 the sender info is an IP address that matches one of our IP addresses
247 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
248 rtable_ent_t* rte; // route table entry added
251 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
255 int ngtoks; // number of tokens in the group list
256 int grp; // index into group list
258 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
259 rr_field = clip( rr_field );
261 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
262 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
263 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
265 key = build_rt_key( subid, atoi( ts_field ) );
267 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
269 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
270 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
272 for( grp = 0; grp < ngtoks; grp++ ) {
273 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
274 for( i = 0; i < ntoks; i++ ) {
275 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
276 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] );
277 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
284 if( DEBUG || (vlevel > 2) ) {
285 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
291 Trash_entry takes a partially parsed record from the input and
292 will delete the entry if the sender,mtype matches us or it's a
293 generic mtype. The refernce in the new table is removed and the
294 refcounter for the actual rte is decreased. If that ref count is
295 0 then the memory is freed (handled byh the del_rte call).
297 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
298 rtable_ent_t* rte; // route table entry to be 'deleted'
301 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
304 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
308 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
310 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
311 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
312 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
314 key = build_rt_key( subid, atoi( ts_field ) );
315 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
317 if( DEBUG || (vlevel > 1) ) {
318 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
320 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
321 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
323 if( DEBUG || (vlevel > 1) ) {
324 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
328 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
333 Parse a single record recevied from the route table generator, or read
334 from a static route table file. Start records cause a new table to
335 be started (if a partial table was received it is discarded. Table
336 entry records are added to the currenly 'in progress' table, and an
337 end record causes the in progress table to be finalised and the
338 currently active table is replaced.
340 We expect one of several types:
342 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
343 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
345 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
347 int ntoks; // number of tokens found in something
349 int grp; // group number
350 rtable_ent_t* rte; // route table entry added
352 char* gtokens[64]; // groups
353 char* tok; // pointer into a token or string
359 while( *buf && isspace( *buf ) ) { // skip leading whitespace
362 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
365 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
366 switch( *(tokens[0]) ) {
367 case 0: // ignore blanks
369 case '#': // and comment lines
372 case 'd': // del | [sender,]mtype | sub-id
373 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
378 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
382 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
383 ctx->new_rtable->updates++;
386 case 'n': // newrt|{start|end}
387 tokens[1] = clip( tokens[1] );
388 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
389 if( ctx->new_rtable ) {
390 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
391 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
392 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
393 ctx->new_rtable = NULL;
394 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
397 fprintf( stderr, "[DBUG] old route table:\n" );
398 rt_stats( ctx->old_rtable );
399 fprintf( stderr, "[DBUG] new route table:\n" );
400 rt_stats( ctx->rtable );
403 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
404 ctx->new_rtable = NULL;
406 } else { // start a new table.
407 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
408 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
409 uta_rt_drop( ctx->new_rtable );
413 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
415 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
417 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
421 case 'm': // assume mse entry
422 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
427 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
431 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
432 ctx->new_rtable->updates++;
435 case 'r': // assume rt entry
436 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
440 ctx->new_rtable->updates++;
441 if( ntoks > 3 ) { // assume new entry with subid last
442 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
444 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
448 case 'u': // update current table, not a total replacement
449 tokens[1] = clip( tokens[1] );
450 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
451 if( ctx->new_rtable == NULL ) { // update table not in progress
456 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
457 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
458 ctx->new_rtable->updates, tokens[2] );
459 uta_rt_drop( ctx->new_rtable );
460 ctx->new_rtable = NULL;
465 if( ctx->new_rtable ) {
466 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
467 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
468 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
469 ctx->new_rtable = NULL;
470 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
473 fprintf( stderr, "[DBUG] old route table:\n" );
474 rt_stats( ctx->old_rtable );
475 fprintf( stderr, "[DBUG] updated route table:\n" );
476 rt_stats( ctx->rtable );
479 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
480 ctx->new_rtable = NULL;
482 } else { // start a new table.
483 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
484 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
485 uta_rt_drop( ctx->new_rtable );
489 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
491 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
494 ctx->new_rtable->updates = 0; // init count of updates received
495 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
500 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
507 This function attempts to open a static route table in order to create a 'seed'
508 table during initialisation. The environment variable RMR_SEED_RT is expected
509 to contain the necessary path to the file. If missing, or if the file is empty,
510 no route table will be available until one is received from the generator.
512 This function is probably most useful for testing situations, or extreme
513 cases where the routes are static.
515 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
518 char* fbuf; // buffer with file contents
519 char* rec; // start of the record
520 char* eor; // end of the record
521 int rcount = 0; // record count for debug
523 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
527 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
528 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
532 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
533 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
535 *eor = '\n'; // will look like a blank line which is ok
539 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
541 if( (eor = strchr( rec, '\n' )) != NULL ) {
544 fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
545 fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
550 parse_rt_rec( ctx, rec, vlevel );
553 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount );
558 Callback driven for each named thing in a symtab. We collect the pointers to those
559 things for later use (cloning).
561 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
564 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
568 if( thing == NULL ) {
572 tl->things[tl->nused++] = thing; // save a reference to the thing
576 Called to delete a route table entry struct. We delete the array of endpoint
577 pointers, but NOT the endpoints referenced as those are referenced from
580 Route table entries can be concurrently referenced by multiple symtabs, so
581 the actual delete happens only if decrementing the rte's ref count takes it
582 to 0. Thus, it is safe to call this function across a symtab when cleaning up
583 the symtab, or overlaying an entry.
585 This function uses ONLY the pointer to the rte (thing) and ignores the other
586 information that symtab foreach function passes (st, entry, and data) which
587 means that it _can_ safetly be used outside of the foreach setting. If
588 the function is changed to depend on any of these three, then a stand-alone
589 rte_cleanup() function should be added and referenced by this, and refererences
590 to this outside of the foreach world should be changed.
592 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
596 if( (rte = (rtable_ent_t *) thing) == NULL ) {
601 if( rte->refs > 0 ) { // something still referencing, so it lives
605 if( rte->rrgroups ) { // clean up the round robin groups
606 for( i = 0; i < rte->nrrgroups; i++ ) {
607 if( rte->rrgroups[i] ) {
608 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
612 free( rte->rrgroups );
615 free( rte ); // finally, drop the potato
619 Read an entire file into a buffer. We assume for route table files
620 they will be smallish and so this won't be a problem.
621 Returns a pointer to the buffer, or nil. Caller must free.
622 Terminates the buffer with a nil character for string processing.
624 If we cannot stat the file, we assume it's empty or missing and return
625 an empty buffer, as opposed to a nil, so the caller can generate defaults
626 or error if an empty/missing file isn't tolerated.
628 static char* uta_fib( char* fname ) {
630 off_t fsize = 8192; // size of the file
631 off_t nread; // number of bytes read
633 char* buf; // input buffer
635 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
636 if( fstat( fd, &stats ) >= 0 ) {
637 if( stats.st_size <= 0 ) { // empty file
641 fsize = stats.st_size; // stat ok, save the file size
644 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
648 if( fd < 0 ) { // didn't open or empty
649 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
657 // add a size limit check here
659 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
665 nread = read( fd, buf, fsize );
666 if( nread < 0 || nread > fsize ) { // failure of some kind
668 errno = EFBIG; // likely too much to handle
680 Create and initialise a route table; Returns a pointer to the table struct.
682 static route_table_t* uta_rt_init( ) {
685 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
689 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
698 Clone (sort of) an existing route table. This is done to preserve the endpoint
699 names referenced in a table (and thus existing sessions) when a new set
700 of message type to endpoint name mappings is received. A new route table
701 with only endpoint name references is returned based on the active table in
704 static route_table_t* uta_rt_clone( route_table_t* srt ) {
705 endpoint_t* ep; // an endpoint
706 route_table_t* nrt; // new route table
707 void* sst; // source symtab
708 void* nst; // new symtab
716 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
720 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
725 things.nalloc = 2048;
727 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
728 if( things.things == NULL ) {
734 sst = srt->hash; // convenience pointers (src symtab)
737 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
739 for( i = 0; i < things.nused; i++ ) {
740 ep = (endpoint_t *) things.things[i];
741 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
744 free( things.things );
749 Clones _all_ of the given route table (references both endpoints AND the route table
750 entries. Needed to support a partial update where some route table entries will not
751 be deleted if not explicitly in the update.
753 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
754 endpoint_t* ep; // an endpoint
755 rtable_ent_t* rte; // a route table entry
756 route_table_t* nrt; // new route table
757 void* sst; // source symtab
758 void* nst; // new symtab
759 thing_list_t things0; // things from space 0 (table entries)
760 thing_list_t things1; // things from space 1 (end points)
767 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
771 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
776 things0.nalloc = 2048;
778 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
779 if( things0.things == NULL ) {
785 things1.nalloc = 2048;
787 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
788 if( things1.things == NULL ) {
794 sst = srt->hash; // convenience pointers (src symtab)
797 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
798 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
800 for( i = 0; i < things0.nused; i++ ) {
801 rte = (rtable_ent_t *) things0.things[i];
802 rte->refs++; // rtes can be removed, so we track references
803 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
806 for( i = 0; i < things1.nused; i++ ) {
807 ep = (endpoint_t *) things1.things[i];
808 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
811 free( things0.things );
812 free( things1.things );
817 Given a name, find the endpoint struct in the provided route table.
819 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
821 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
825 return rmr_sym_get( rt->hash, ep_name, 1 );
829 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
831 static void uta_rt_drop( route_table_t* rt ) {
836 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
837 rmr_sym_free( rt->hash ); // free all of the hash related data
842 Look up and return the pointer to the endpoint stuct matching the given name.
843 If not in the hash, a new endpoint is created, added to the hash. Should always
846 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
849 if( !rt || !ep_name || ! *ep_name ) {
850 fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
855 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
856 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
857 fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
862 ep->open = 0; // not connected
863 ep->addr = uta_h2ip( ep_name );
864 ep->name = strdup( ep_name );
865 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
867 rmr_sym_put( rt->hash, ep_name, 1, ep );
875 Given a session id and message type build a key that can be used to look up the rte in the route
876 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
878 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
881 if( sub_id == UNSET_SUBID ) {
882 key = 0xffffffff00000000 | mtype;
884 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);