3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ---- debugging/testing -------------------------------------------------------------------------
63 Dump stats for an endpoint in the RT.
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
69 if( (ep = (endpoint_t *) thing) == NULL ) {
73 if( (counter = (int *) vcounter) != NULL ) {
77 fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
81 Dump stats for a route entry in the table.
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
85 rtable_ent_t* rte; // thing is really an rte
89 if( (rte = (rtable_ent_t *) thing) == NULL ) {
93 if( (counter = (int *) vcounter) != NULL ) {
97 mtype = rte->key & 0xffff;
98 sid = (int) (rte->key >> 32);
100 fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
104 Given a route table, cause some stats to be spit out.
106 static void rt_stats( route_table_t* rt ) {
110 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
114 counter = (int *) malloc( sizeof( int ) );
116 fprintf( stderr, "[DBUG] rtstats:\n" );
117 rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
118 fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
121 rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
122 fprintf( stderr, "[DBUG] %d entries\n", *counter );
128 // ------------------------------------------------------------------------------------------------
130 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
133 static char* clip( char* buf ) {
136 while( *buf && isspace( *buf ) ) { // skip leading whitespace
140 if( (tok = strchr( buf, '#' )) != NULL ) {
142 return buf; // just push back; leading comment sym handled there
145 if( isspace( *(tok-1) ) ) {
150 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
158 Given a message type create a route table entry and add to the hash keyed on the
159 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
160 is the number of group slots to allocate in the entry.
162 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
164 rtable_ent_t* old_rte; // entry which was already in the table for the key
170 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
171 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
174 memset( rte, 0, sizeof( *rte ) );
178 if( nrrgroups <= 0 ) {
182 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
183 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
187 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
188 rte->nrrgroups = nrrgroups;
190 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
191 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
194 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
196 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
201 This accepts partially parsed information from a record sent by route manager or read from
203 ts_field is the msg-type,sender field
204 subid is the integer subscription id
205 rr_field is the endpoint information for round robening message over
207 If all goes well, this will add an RTE to the table under construction.
209 The ts_field is checked to see if we should ingest this record. We ingest if one of
211 there is no sender info (a generic entry for all)
212 there is sender and our host:port matches one of the senders
213 the sender info is an IP address that matches one of our IP addresses
215 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
216 rtable_ent_t* rte; // route table entry added
219 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
223 int ngtoks; // number of tokens in the group list
224 int grp; // index into group list
226 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
227 rr_field = clip( rr_field );
229 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
230 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
231 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
233 key = build_rt_key( subid, atoi( ts_field ) );
235 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
237 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
238 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
240 for( grp = 0; grp < ngtoks; grp++ ) {
241 if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
242 for( i = 0; i < ntoks; i++ ) {
243 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field );
244 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
250 if( DEBUG || (vlevel > 2) )
251 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
256 Trash_entry takes a partially parsed record from the input and
257 will delete the entry if the sender,mtype matches us or it's a
258 generic mtype. The refernce in the new table is removed and the
259 refcounter for the actual rte is decreased. If that ref count is
260 0 then the memory is freed (handled byh the del_rte call).
262 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
263 rtable_ent_t* rte; // route table entry to be 'deleted'
266 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
269 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
273 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
275 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
276 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
277 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
279 key = build_rt_key( subid, atoi( ts_field ) );
280 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
282 if( DEBUG || (vlevel > 1) ) {
283 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
285 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
286 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
288 if( DEBUG || (vlevel > 1) ) {
289 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
293 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
298 Parse a single record recevied from the route table generator, or read
299 from a static route table file. Start records cause a new table to
300 be started (if a partial table was received it is discarded. Table
301 entry records are added to the currenly 'in progress' table, and an
302 end record causes the in progress table to be finalised and the
303 currently active table is replaced.
305 We expect one of several types:
307 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
308 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
310 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
312 int ntoks; // number of tokens found in something
314 int grp; // group number
315 rtable_ent_t* rte; // route table entry added
317 char* gtokens[64]; // groups
318 char* tok; // pointer into a token or string
324 while( *buf && isspace( *buf ) ) { // skip leading whitespace
327 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
330 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
331 switch( *(tokens[0]) ) {
332 case 0: // ignore blanks
334 case '#': // and comment lines
337 case 'd': // del | [sender,]mtype | sub-id
338 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
343 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
347 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
348 ctx->new_rtable->updates++;
351 case 'n': // newrt|{start|end}
352 tokens[1] = clip( tokens[1] );
353 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
354 if( ctx->new_rtable ) {
355 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
356 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
357 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
358 ctx->new_rtable = NULL;
359 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
362 fprintf( stderr, "[DBUG] old route table:\n" );
363 rt_stats( ctx->old_rtable );
364 fprintf( stderr, "[DBUG] new route table:\n" );
365 rt_stats( ctx->rtable );
368 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
369 ctx->new_rtable = NULL;
371 } else { // start a new table.
372 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
373 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
374 uta_rt_drop( ctx->new_rtable );
378 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
380 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
382 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
386 case 'm': // assume mse entry
387 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
392 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
396 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
397 ctx->new_rtable->updates++;
400 case 'r': // assume rt entry
401 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
405 ctx->new_rtable->updates++;
406 if( ntoks > 3 ) { // assume new entry with subid last
407 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
409 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
413 case 'u': // update current table, not a total replacement
414 tokens[1] = clip( tokens[1] );
415 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
416 if( ctx->new_rtable == NULL ) { // update table not in progress
421 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
422 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
423 ctx->new_rtable->updates, tokens[2] );
424 uta_rt_drop( ctx->new_rtable );
425 ctx->new_rtable = NULL;
430 if( ctx->new_rtable ) {
431 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
432 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
433 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
434 ctx->new_rtable = NULL;
435 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
438 fprintf( stderr, "[DBUG] old route table:\n" );
439 rt_stats( ctx->old_rtable );
440 fprintf( stderr, "[DBUG] updated route table:\n" );
441 rt_stats( ctx->rtable );
444 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
445 ctx->new_rtable = NULL;
447 } else { // start a new table.
448 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
449 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
450 uta_rt_drop( ctx->new_rtable );
454 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
456 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
459 ctx->new_rtable->updates = 0; // init count of updates received
460 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
465 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
472 This function attempts to open a static route table in order to create a 'seed'
473 table during initialisation. The environment variable RMR_SEED_RT is expected
474 to contain the necessary path to the file. If missing, or if the file is empty,
475 no route table will be available until one is received from the generator.
477 This function is probably most useful for testing situations, or extreme
478 cases where the routes are static.
480 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
483 char* fbuf; // buffer with file contents
484 char* rec; // start of the record
485 char* eor; // end of the record
486 int rcount = 0; // record count for debug
488 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
492 if( (fbuf = uta_fib( fname ) ) == NULL ) { // read file into a single buffer (nil terminated string)
493 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
497 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
498 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
500 *eor = '\n'; // will look like a blank line which is ok
504 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
506 if( (eor = strchr( rec, '\n' )) != NULL ) {
509 fprintf( stderr, "[WARN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
510 fprintf( stderr, "[WARN] rmr read_static: seed route table not used%s\n", fname );
515 parse_rt_rec( ctx, rec, vlevel );
518 if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount );
523 Callback driven for each named thing in a symtab. We collect the pointers to those
524 things for later use (cloning).
526 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
529 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
533 if( thing == NULL ) {
537 tl->things[tl->nused++] = thing; // save a reference to the thing
541 Called to delete a route table entry struct. We delete the array of endpoint
542 pointers, but NOT the endpoints referenced as those are referenced from
545 Route table entries can be concurrently referenced by multiple symtabs, so
546 the actual delete happens only if decrementing the rte's ref count takes it
547 to 0. Thus, it is safe to call this function across a symtab when cleaning up
548 the symtab, or overlaying an entry.
550 This function uses ONLY the pointer to the rte (thing) and ignores the other
551 information that symtab foreach function passes (st, entry, and data) which
552 means that it _can_ safetly be used outside of the foreach setting. If
553 the function is changed to depend on any of these three, then a stand-alone
554 rte_cleanup() function should be added and referenced by this, and refererences
555 to this outside of the foreach world should be changed.
557 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
561 if( (rte = (rtable_ent_t *) thing) == NULL ) {
566 if( rte->refs > 0 ) { // something still referencing, so it lives
570 if( rte->rrgroups ) { // clean up the round robin groups
571 for( i = 0; i < rte->nrrgroups; i++ ) {
572 if( rte->rrgroups[i] ) {
573 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
577 free( rte->rrgroups );
580 free( rte ); // finally, drop the potato
584 Read an entire file into a buffer. We assume for route table files
585 they will be smallish and so this won't be a problem.
586 Returns a pointer to the buffer, or nil. Caller must free.
587 Terminates the buffer with a nil character for string processing.
589 If we cannot stat the file, we assume it's empty or missing and return
590 an empty buffer, as opposed to a nil, so the caller can generate defaults
591 or error if an empty/missing file isn't tolerated.
593 static char* uta_fib( char* fname ) {
595 off_t fsize = 8192; // size of the file
596 off_t nread; // number of bytes read
598 char* buf; // input buffer
600 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
601 if( fstat( fd, &stats ) >= 0 ) {
602 if( stats.st_size <= 0 ) { // empty file
606 fsize = stats.st_size; // stat ok, save the file size
609 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
613 if( fd < 0 ) { // didn't open or empty
614 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
622 // add a size limit check here
624 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
630 nread = read( fd, buf, fsize );
631 if( nread < 0 || nread > fsize ) { // failure of some kind
633 errno = EFBIG; // likely too much to handle
645 Create and initialise a route table; Returns a pointer to the table struct.
647 static route_table_t* uta_rt_init( ) {
650 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
654 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
663 Clone (sort of) an existing route table. This is done to preserve the endpoint
664 names referenced in a table (and thus existing sessions) when a new set
665 of message type to endpoint name mappings is received. A new route table
666 with only endpoint name references is returned based on the active table in
669 static route_table_t* uta_rt_clone( route_table_t* srt ) {
670 endpoint_t* ep; // an endpoint
671 route_table_t* nrt; // new route table
672 void* sst; // source symtab
673 void* nst; // new symtab
681 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
685 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
690 things.nalloc = 2048;
692 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
693 if( things.things == NULL ) {
699 sst = srt->hash; // convenience pointers (src symtab)
702 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
704 for( i = 0; i < things.nused; i++ ) {
705 ep = (endpoint_t *) things.things[i];
706 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
709 free( things.things );
714 Clones _all_ of the given route table (references both endpoints AND the route table
715 entries. Needed to support a partial update where some route table entries will not
716 be deleted if not explicitly in the update.
718 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
719 endpoint_t* ep; // an endpoint
720 rtable_ent_t* rte; // a route table entry
721 route_table_t* nrt; // new route table
722 void* sst; // source symtab
723 void* nst; // new symtab
724 thing_list_t things0; // things from space 0 (table entries)
725 thing_list_t things1; // things from space 1 (end points)
732 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
736 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
741 things0.nalloc = 2048;
743 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
744 if( things0.things == NULL ) {
750 things1.nalloc = 2048;
752 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
753 if( things1.things == NULL ) {
759 sst = srt->hash; // convenience pointers (src symtab)
762 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
763 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
765 for( i = 0; i < things0.nused; i++ ) {
766 rte = (rtable_ent_t *) things0.things[i];
767 rte->refs++; // rtes can be removed, so we track references
768 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
771 for( i = 0; i < things1.nused; i++ ) {
772 ep = (endpoint_t *) things1.things[i];
773 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
776 free( things0.things );
777 free( things1.things );
782 Given a name, find the endpoint struct in the provided route table.
784 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
786 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
790 return rmr_sym_get( rt->hash, ep_name, 1 );
794 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
796 static void uta_rt_drop( route_table_t* rt ) {
801 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
802 rmr_sym_free( rt->hash ); // free all of the hash related data
807 Look up and return the pointer to the endpoint stuct matching the given name.
808 If not in the hash, a new endpoint is created, added to the hash. Should always
811 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
814 if( !rt || !ep_name || ! *ep_name ) {
815 fprintf( stderr, "[WARN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
820 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
821 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
822 fprintf( stderr, "[WARN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
827 ep->open = 0; // not connected
828 ep->addr = uta_h2ip( ep_name );
829 ep->name = strdup( ep_name );
830 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
832 rmr_sym_put( rt->hash, ep_name, 1, ep );
840 Given a session id and message type build a key that can be used to look up the rte in the route
841 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
843 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
846 if( sub_id == UNSET_SUBID ) {
847 key = 0xffffffff00000000 | mtype;
849 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);