3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ---- debugging/testing -------------------------------------------------------------------------
63 Dump stats for an endpoint in the RT.
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
69 if( (ep = (endpoint_t *) thing) == NULL ) {
73 if( (counter = (int *) vcounter) != NULL ) {
77 fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
81 Dump stats for a route entry in the table.
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
85 rtable_ent_t* rte; // thing is really an rte
89 if( (rte = (rtable_ent_t *) thing) == NULL ) {
93 if( (counter = (int *) vcounter) != NULL ) {
97 mtype = rte->key & 0xffff;
98 sid = (int) (rte->key >> 32);
100 fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
104 Given a route table, cause some stats to be spit out.
106 static void rt_stats( route_table_t* rt ) {
110 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
114 counter = (int *) malloc( sizeof( int ) );
116 fprintf( stderr, "[DBUG] rtstats:\n" );
117 rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
118 fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
121 rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
122 fprintf( stderr, "[DBUG] %d entries\n", *counter );
128 // ------------------------------------------------------------------------------------------------
130 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
133 static char* clip( char* buf ) {
136 while( *buf && isspace( *buf ) ) { // skip leading whitespace
140 if( (tok = strchr( buf, '#' )) != NULL ) {
142 return buf; // just push back; leading comment sym handled there
145 if( isspace( *(tok-1) ) ) {
150 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
157 This accepts a pointer to a nil terminated string, and ensures that there is a
158 newline as the last character. If there is not, a new buffer is allocated and
159 the newline is added. If a new buffer is allocated, the buffer passed in is
160 freed. The function returns a pointer which the caller should use, and must
161 free. In the event of an error, a nil pointer is returned.
163 static char* ensure_nlterm( char* buf ) {
169 if( buf == NULL || (len = strlen( buf )) < 2 ) {
170 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
175 if( buf[len-1] != '\n' ) {
176 fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
177 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
178 memcpy( nb, buf, len );
179 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
191 Given a message type create a route table entry and add to the hash keyed on the
192 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
193 is the number of group slots to allocate in the entry.
195 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
197 rtable_ent_t* old_rte; // entry which was already in the table for the key
203 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
204 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
207 memset( rte, 0, sizeof( *rte ) );
211 if( nrrgroups <= 0 ) {
215 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
216 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
220 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
221 rte->nrrgroups = nrrgroups;
223 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
224 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
227 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
229 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
234 This accepts partially parsed information from a record sent by route manager or read from
236 ts_field is the msg-type,sender field
237 subid is the integer subscription id
238 rr_field is the endpoint information for round robening message over
240 If all goes well, this will add an RTE to the table under construction.
242 The ts_field is checked to see if we should ingest this record. We ingest if one of
244 there is no sender info (a generic entry for all)
245 there is sender and our host:port matches one of the senders
246 the sender info is an IP address that matches one of our IP addresses
248 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
249 rtable_ent_t* rte; // route table entry added
252 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
256 int ngtoks; // number of tokens in the group list
257 int grp; // index into group list
259 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
260 rr_field = clip( rr_field );
262 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
263 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
264 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
266 key = build_rt_key( subid, atoi( ts_field ) );
268 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
270 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
271 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
273 for( grp = 0; grp < ngtoks; grp++ ) {
274 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
275 for( i = 0; i < ntoks; i++ ) {
276 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
277 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] );
278 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
285 if( DEBUG || (vlevel > 2) ) {
286 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
292 Trash_entry takes a partially parsed record from the input and
293 will delete the entry if the sender,mtype matches us or it's a
294 generic mtype. The refernce in the new table is removed and the
295 refcounter for the actual rte is decreased. If that ref count is
296 0 then the memory is freed (handled byh the del_rte call).
298 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
299 rtable_ent_t* rte; // route table entry to be 'deleted'
302 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
305 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
309 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
311 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
312 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
313 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
315 key = build_rt_key( subid, atoi( ts_field ) );
316 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
318 if( DEBUG || (vlevel > 1) ) {
319 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
321 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
322 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
324 if( DEBUG || (vlevel > 1) ) {
325 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
329 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
334 Parse a single record recevied from the route table generator, or read
335 from a static route table file. Start records cause a new table to
336 be started (if a partial table was received it is discarded. Table
337 entry records are added to the currenly 'in progress' table, and an
338 end record causes the in progress table to be finalised and the
339 currently active table is replaced.
341 We expect one of several types:
343 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
344 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
346 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
348 int ntoks; // number of tokens found in something
350 int grp; // group number
351 rtable_ent_t* rte; // route table entry added
353 char* gtokens[64]; // groups
354 char* tok; // pointer into a token or string
360 while( *buf && isspace( *buf ) ) { // skip leading whitespace
363 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
366 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
367 switch( *(tokens[0]) ) {
368 case 0: // ignore blanks
370 case '#': // and comment lines
373 case 'd': // del | [sender,]mtype | sub-id
374 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
379 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
383 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
384 ctx->new_rtable->updates++;
387 case 'n': // newrt|{start|end}
388 tokens[1] = clip( tokens[1] );
389 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
390 if( ctx->new_rtable ) {
391 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
392 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
393 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
394 ctx->new_rtable = NULL;
395 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
398 fprintf( stderr, "[DBUG] old route table:\n" );
399 rt_stats( ctx->old_rtable );
400 fprintf( stderr, "[DBUG] new route table:\n" );
401 rt_stats( ctx->rtable );
404 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
405 ctx->new_rtable = NULL;
407 } else { // start a new table.
408 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
409 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
410 uta_rt_drop( ctx->new_rtable );
414 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
416 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
418 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
422 case 'm': // assume mse entry
423 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
428 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
432 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
433 ctx->new_rtable->updates++;
436 case 'r': // assume rt entry
437 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
441 ctx->new_rtable->updates++;
442 if( ntoks > 3 ) { // assume new entry with subid last
443 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
445 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
449 case 'u': // update current table, not a total replacement
450 tokens[1] = clip( tokens[1] );
451 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
452 if( ctx->new_rtable == NULL ) { // update table not in progress
457 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
458 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
459 ctx->new_rtable->updates, tokens[2] );
460 uta_rt_drop( ctx->new_rtable );
461 ctx->new_rtable = NULL;
466 if( ctx->new_rtable ) {
467 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
468 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
469 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
470 ctx->new_rtable = NULL;
471 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
474 fprintf( stderr, "[DBUG] old route table:\n" );
475 rt_stats( ctx->old_rtable );
476 fprintf( stderr, "[DBUG] updated route table:\n" );
477 rt_stats( ctx->rtable );
480 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
481 ctx->new_rtable = NULL;
483 } else { // start a new table.
484 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
485 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
486 uta_rt_drop( ctx->new_rtable );
490 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
492 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
495 ctx->new_rtable->updates = 0; // init count of updates received
496 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
501 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
508 This function attempts to open a static route table in order to create a 'seed'
509 table during initialisation. The environment variable RMR_SEED_RT is expected
510 to contain the necessary path to the file. If missing, or if the file is empty,
511 no route table will be available until one is received from the generator.
513 This function is probably most useful for testing situations, or extreme
514 cases where the routes are static.
516 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
519 char* fbuf; // buffer with file contents
520 char* rec; // start of the record
521 char* eor; // end of the record
522 int rcount = 0; // record count for debug
524 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
528 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
529 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
533 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
534 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
536 *eor = '\n'; // will look like a blank line which is ok
540 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
542 if( (eor = strchr( rec, '\n' )) != NULL ) {
545 fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
546 fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
551 parse_rt_rec( ctx, rec, vlevel );
554 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount );
559 Callback driven for each named thing in a symtab. We collect the pointers to those
560 things for later use (cloning).
562 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
565 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
569 if( thing == NULL ) {
573 tl->things[tl->nused++] = thing; // save a reference to the thing
577 Called to delete a route table entry struct. We delete the array of endpoint
578 pointers, but NOT the endpoints referenced as those are referenced from
581 Route table entries can be concurrently referenced by multiple symtabs, so
582 the actual delete happens only if decrementing the rte's ref count takes it
583 to 0. Thus, it is safe to call this function across a symtab when cleaning up
584 the symtab, or overlaying an entry.
586 This function uses ONLY the pointer to the rte (thing) and ignores the other
587 information that symtab foreach function passes (st, entry, and data) which
588 means that it _can_ safetly be used outside of the foreach setting. If
589 the function is changed to depend on any of these three, then a stand-alone
590 rte_cleanup() function should be added and referenced by this, and refererences
591 to this outside of the foreach world should be changed.
593 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
597 if( (rte = (rtable_ent_t *) thing) == NULL ) {
602 if( rte->refs > 0 ) { // something still referencing, so it lives
606 if( rte->rrgroups ) { // clean up the round robin groups
607 for( i = 0; i < rte->nrrgroups; i++ ) {
608 if( rte->rrgroups[i] ) {
609 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
613 free( rte->rrgroups );
616 free( rte ); // finally, drop the potato
620 Read an entire file into a buffer. We assume for route table files
621 they will be smallish and so this won't be a problem.
622 Returns a pointer to the buffer, or nil. Caller must free.
623 Terminates the buffer with a nil character for string processing.
625 If we cannot stat the file, we assume it's empty or missing and return
626 an empty buffer, as opposed to a nil, so the caller can generate defaults
627 or error if an empty/missing file isn't tolerated.
629 static char* uta_fib( char* fname ) {
631 off_t fsize = 8192; // size of the file
632 off_t nread; // number of bytes read
634 char* buf; // input buffer
636 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
637 if( fstat( fd, &stats ) >= 0 ) {
638 if( stats.st_size <= 0 ) { // empty file
642 fsize = stats.st_size; // stat ok, save the file size
645 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
649 if( fd < 0 ) { // didn't open or empty
650 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
658 // add a size limit check here
660 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
666 nread = read( fd, buf, fsize );
667 if( nread < 0 || nread > fsize ) { // failure of some kind
669 errno = EFBIG; // likely too much to handle
681 Create and initialise a route table; Returns a pointer to the table struct.
683 static route_table_t* uta_rt_init( ) {
686 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
690 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
699 Clone (sort of) an existing route table. This is done to preserve the endpoint
700 names referenced in a table (and thus existing sessions) when a new set
701 of message type to endpoint name mappings is received. A new route table
702 with only endpoint name references is returned based on the active table in
705 static route_table_t* uta_rt_clone( route_table_t* srt ) {
706 endpoint_t* ep; // an endpoint
707 route_table_t* nrt; // new route table
708 void* sst; // source symtab
709 void* nst; // new symtab
717 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
721 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
726 things.nalloc = 2048;
728 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
729 if( things.things == NULL ) {
735 sst = srt->hash; // convenience pointers (src symtab)
738 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
740 for( i = 0; i < things.nused; i++ ) {
741 ep = (endpoint_t *) things.things[i];
742 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
745 free( things.things );
750 Clones _all_ of the given route table (references both endpoints AND the route table
751 entries. Needed to support a partial update where some route table entries will not
752 be deleted if not explicitly in the update.
754 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
755 endpoint_t* ep; // an endpoint
756 rtable_ent_t* rte; // a route table entry
757 route_table_t* nrt; // new route table
758 void* sst; // source symtab
759 void* nst; // new symtab
760 thing_list_t things0; // things from space 0 (table entries)
761 thing_list_t things1; // things from space 1 (end points)
768 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
772 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
777 things0.nalloc = 2048;
779 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
780 if( things0.things == NULL ) {
786 things1.nalloc = 2048;
788 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
789 if( things1.things == NULL ) {
795 sst = srt->hash; // convenience pointers (src symtab)
798 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
799 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
801 for( i = 0; i < things0.nused; i++ ) {
802 rte = (rtable_ent_t *) things0.things[i];
803 rte->refs++; // rtes can be removed, so we track references
804 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
807 for( i = 0; i < things1.nused; i++ ) {
808 ep = (endpoint_t *) things1.things[i];
809 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
812 free( things0.things );
813 free( things1.things );
818 Given a name, find the endpoint struct in the provided route table.
820 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
822 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
826 return rmr_sym_get( rt->hash, ep_name, 1 );
830 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
832 static void uta_rt_drop( route_table_t* rt ) {
837 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
838 rmr_sym_free( rt->hash ); // free all of the hash related data
843 Look up and return the pointer to the endpoint stuct matching the given name.
844 If not in the hash, a new endpoint is created, added to the hash. Should always
847 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
850 if( !rt || !ep_name || ! *ep_name ) {
851 fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
856 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
857 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
858 fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
863 ep->open = 0; // not connected
864 ep->addr = uta_h2ip( ep_name );
865 ep->name = strdup( ep_name );
866 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
868 rmr_sym_put( rt->hash, ep_name, 1, ep );
876 Given a session id and message type build a key that can be used to look up the rte in the route
877 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
879 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
882 if( sub_id == UNSET_SUBID ) {
883 key = 0xffffffff00000000 | mtype;
885 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);