3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
51 Passed to a symtab foreach callback to construct a list of pointers from
54 typedef struct thing_list {
60 // ---- debugging/testing -------------------------------------------------------------------------
63 Dump stats for an endpoint in the RT.
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
69 if( (ep = (endpoint_t *) thing) == NULL ) {
73 if( (counter = (int *) vcounter) != NULL ) {
77 fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
81 Dump stats for a route entry in the table.
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
85 rtable_ent_t* rte; // thing is really an rte
89 if( (rte = (rtable_ent_t *) thing) == NULL ) {
93 if( (counter = (int *) vcounter) != NULL ) {
97 mtype = rte->key & 0xffff;
98 sid = (int) (rte->key >> 32);
100 fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
104 Given a route table, cause some stats to be spit out.
106 static void rt_stats( route_table_t* rt ) {
110 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
114 counter = (int *) malloc( sizeof( int ) );
116 fprintf( stderr, "[DBUG] rtstats:\n" );
117 rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
118 fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
121 rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
122 fprintf( stderr, "[DBUG] %d entries\n", *counter );
128 // ------------------------------------------------------------------------------------------------
130 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
133 static char* clip( char* buf ) {
136 while( *buf && isspace( *buf ) ) { // skip leading whitespace
140 if( (tok = strchr( buf, '#' )) != NULL ) {
142 return buf; // just push back; leading comment sym handled there
145 if( isspace( *(tok-1) ) ) {
150 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
158 Given a message type create a route table entry and add to the hash keyed on the
159 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
160 is the number of group slots to allocate in the entry.
162 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
164 rtable_ent_t* old_rte; // entry which was already in the table for the key
170 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
171 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
174 memset( rte, 0, sizeof( *rte ) );
178 if( nrrgroups <= 0 ) {
182 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
183 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
187 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
188 rte->nrrgroups = nrrgroups;
190 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
191 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
194 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
196 if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
201 This accepts partially parsed information from a record sent by route manager or read from
203 ts_field is the msg-type,sender field
204 subid is the integer subscription id
205 rr_field is the endpoint information for round robening message over
207 If all goes well, this will add an RTE to the table under construction.
209 The ts_field is checked to see if we should ingest this record. We ingest if one of
211 there is no sender info (a generic entry for all)
212 there is sender and our host:port matches one of the senders
213 the sender info is an IP address that matches one of our IP addresses
215 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
216 rtable_ent_t* rte; // route table entry added
219 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
223 int ngtoks; // number of tokens in the group list
224 int grp; // index into group list
226 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
227 rr_field = clip( rr_field );
229 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
230 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
231 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
233 key = build_rt_key( subid, atoi( ts_field ) );
235 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
237 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
238 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
240 for( grp = 0; grp < ngtoks; grp++ ) {
241 if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
242 for( i = 0; i < ntoks; i++ ) {
243 if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field );
244 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
250 if( DEBUG || (vlevel > 2) )
251 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
256 Trash_entry takes a partially parsed record from the input and
257 will delete the entry if the sender,mtype matches us or it's a
258 generic mtype. The refernce in the new table is removed and the
259 refcounter for the actual rte is decreased. If that ref count is
260 0 then the memory is freed (handled byh the del_rte call).
262 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
263 rtable_ent_t* rte; // route table entry to be 'deleted'
266 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
269 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
273 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
275 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
276 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
277 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
279 key = build_rt_key( subid, atoi( ts_field ) );
280 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
282 if( DEBUG || (vlevel > 1) ) {
283 fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
285 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
286 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
288 if( DEBUG || (vlevel > 1) ) {
289 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
293 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
298 Parse a single record recevied from the route table generator, or read
299 from a static route table file. Start records cause a new table to
300 be started (if a partial table was received it is discarded. Table
301 entry records are added to the currenly 'in progress' table, and an
302 end record causes the in progress table to be finalised and the
303 currently active table is replaced.
305 We expect one of several types:
307 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
308 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
310 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
312 int ntoks; // number of tokens found in something
314 int grp; // group number
315 rtable_ent_t* rte; // route table entry added
317 char* gtokens[64]; // groups
318 char* tok; // pointer into a token or string
324 while( *buf && isspace( *buf ) ) { // skip leading whitespace
327 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
330 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
331 switch( *(tokens[0]) ) {
332 case 0: // ignore blanks
334 case '#': // and comment lines
337 case 'd': // del | [sender,]mtype | sub-id
338 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
343 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
347 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
348 ctx->new_rtable->updates++;
351 case 'n': // newrt|{start|end}
352 tokens[1] = clip( tokens[1] );
353 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
354 if( ctx->new_rtable ) {
355 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
356 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
357 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
358 ctx->new_rtable = NULL;
359 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
362 fprintf( stderr, "[DBUG] old route table:\n" );
363 rt_stats( ctx->old_rtable );
364 fprintf( stderr, "[DBUG] new route table:\n" );
365 rt_stats( ctx->rtable );
368 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
369 ctx->new_rtable = NULL;
371 } else { // start a new table.
372 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
373 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
374 uta_rt_drop( ctx->new_rtable );
378 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
380 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
382 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
386 case 'm': // assume mse entry
387 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
392 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
396 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
397 ctx->new_rtable->updates++;
400 case 'r': // assume rt entry
401 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
405 ctx->new_rtable->updates++;
406 if( ntoks > 3 ) { // assume new entry with subid last
407 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
409 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
413 case 'u': // update current table, not a total replacement
414 tokens[1] = clip( tokens[1] );
415 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
416 if( ctx->new_rtable == NULL ) { // update table not in progress
421 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
422 fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
423 ctx->new_rtable->updates, tokens[2] );
424 uta_rt_drop( ctx->new_rtable );
425 ctx->new_rtable = NULL;
430 if( ctx->new_rtable ) {
431 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
432 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
433 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
434 ctx->new_rtable = NULL;
435 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
438 fprintf( stderr, "[DBUG] old route table:\n" );
439 rt_stats( ctx->old_rtable );
440 fprintf( stderr, "[DBUG] updated route table:\n" );
441 rt_stats( ctx->rtable );
444 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
445 ctx->new_rtable = NULL;
447 } else { // start a new table.
448 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
449 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
450 uta_rt_drop( ctx->new_rtable );
454 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
456 ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
459 ctx->new_rtable->updates = 0; // init count of updates received
460 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
465 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
472 This function attempts to open a static route table in order to create a 'seed'
473 table during initialisation. The environment variable RMR_SEED_RT is expected
474 to contain the necessary path to the file. If missing, or if the file is empty,
475 no route table will be available until one is received from the generator.
477 This function is probably most useful for testing situations, or extreme
478 cases where the routes are static.
480 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
483 char* fbuf; // buffer with file contents
484 char* rec; // start of the record
485 char* eor; // end of the record
486 int rcount = 0; // record count for debug
488 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
492 if( (fbuf = uta_fib( fname ) ) == NULL ) { // read file into a single buffer
493 fprintf( stderr, "[WRN] seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
497 if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully opened: %s\n", fname );
498 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
500 if( (eor = strchr( rec, '\n' )) != NULL ) {
504 parse_rt_rec( ctx, rec, vlevel );
507 if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully parsed: %d records\n", rcount );
512 Callback driven for each named thing in a symtab. We collect the pointers to those
513 things for later use (cloning).
515 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
518 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
522 if( thing == NULL ) {
526 tl->things[tl->nused++] = thing; // save a reference to the thing
530 Called to delete a route table entry struct. We delete the array of endpoint
531 pointers, but NOT the endpoints referenced as those are referenced from
534 Route table entries can be concurrently referenced by multiple symtabs, so
535 the actual delete happens only if decrementing the rte's ref count takes it
536 to 0. Thus, it is safe to call this function across a symtab when cleaning up
537 the symtab, or overlaying an entry.
539 This function uses ONLY the pointer to the rte (thing) and ignores the other
540 information that symtab foreach function passes (st, entry, and data) which
541 means that it _can_ safetly be used outside of the foreach setting. If
542 the function is changed to depend on any of these three, then a stand-alone
543 rte_cleanup() function should be added and referenced by this, and refererences
544 to this outside of the foreach world should be changed.
546 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
550 if( (rte = (rtable_ent_t *) thing) == NULL ) {
555 if( rte->refs > 0 ) { // something still referencing, so it lives
559 if( rte->rrgroups ) { // clean up the round robin groups
560 for( i = 0; i < rte->nrrgroups; i++ ) {
561 if( rte->rrgroups[i] ) {
562 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
566 free( rte->rrgroups );
569 free( rte ); // finally, drop the potato
573 Read an entire file into a buffer. We assume for route table files
574 they will be smallish and so this won't be a problem.
575 Returns a pointer to the buffer, or nil. Caller must free.
576 Terminates the buffer with a nil character for string processing.
578 If we cannot stat the file, we assume it's empty or missing and return
579 an empty buffer, as opposed to a nil, so the caller can generate defaults
580 or error if an empty/missing file isn't tolerated.
582 static char* uta_fib( char* fname ) {
584 off_t fsize = 8192; // size of the file
585 off_t nread; // number of bytes read
587 char* buf; // input buffer
589 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
590 if( fstat( fd, &stats ) >= 0 ) {
591 if( stats.st_size <= 0 ) { // empty file
595 fsize = stats.st_size; // stat ok, save the file size
598 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
602 if( fd < 0 ) { // didn't open or empty
603 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
611 // add a size limit check here
613 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
619 nread = read( fd, buf, fsize );
620 if( nread < 0 || nread > fsize ) { // failure of some kind
622 errno = EFBIG; // likely too much to handle
634 Create and initialise a route table; Returns a pointer to the table struct.
636 static route_table_t* uta_rt_init( ) {
639 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
643 if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
652 Clone (sort of) an existing route table. This is done to preserve the endpoint
653 names referenced in a table (and thus existing sessions) when a new set
654 of message type to endpoint name mappings is received. A new route table
655 with only endpoint name references is returned based on the active table in
658 static route_table_t* uta_rt_clone( route_table_t* srt ) {
659 endpoint_t* ep; // an endpoint
660 route_table_t* nrt; // new route table
661 void* sst; // source symtab
662 void* nst; // new symtab
670 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
674 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
679 things.nalloc = 2048;
681 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
682 if( things.things == NULL ) {
688 sst = srt->hash; // convenience pointers (src symtab)
691 rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
693 for( i = 0; i < things.nused; i++ ) {
694 ep = (endpoint_t *) things.things[i];
695 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
698 free( things.things );
703 Clones _all_ of the given route table (references both endpoints AND the route table
704 entries. Needed to support a partial update where some route table entries will not
705 be deleted if not explicitly in the update.
707 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
708 endpoint_t* ep; // an endpoint
709 rtable_ent_t* rte; // a route table entry
710 route_table_t* nrt; // new route table
711 void* sst; // source symtab
712 void* nst; // new symtab
713 thing_list_t things0; // things from space 0 (table entries)
714 thing_list_t things1; // things from space 1 (end points)
721 if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
725 if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
730 things0.nalloc = 2048;
732 things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
733 if( things0.things == NULL ) {
739 things1.nalloc = 2048;
741 things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
742 if( things1.things == NULL ) {
748 sst = srt->hash; // convenience pointers (src symtab)
751 rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
752 rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
754 for( i = 0; i < things0.nused; i++ ) {
755 rte = (rtable_ent_t *) things0.things[i];
756 rte->refs++; // rtes can be removed, so we track references
757 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
760 for( i = 0; i < things1.nused; i++ ) {
761 ep = (endpoint_t *) things1.things[i];
762 rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
765 free( things0.things );
766 free( things1.things );
771 Given a name, find the endpoint struct in the provided route table.
773 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
775 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
779 return rmr_sym_get( rt->hash, ep_name, 1 );
783 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
785 static void uta_rt_drop( route_table_t* rt ) {
790 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
791 rmr_sym_free( rt->hash ); // free all of the hash related data
796 Look up and return the pointer to the endpoint stuct matching the given name.
797 If not in the hash, a new endpoint is created, added to the hash. Should always
800 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
803 if( !rt || !ep_name || ! *ep_name ) {
804 fprintf( stderr, "[WARN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
809 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
810 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
811 fprintf( stderr, "[WARN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
816 ep->open = 0; // not connected
817 ep->addr = uta_h2ip( ep_name );
818 ep->name = strdup( ep_name );
819 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
821 rmr_sym_put( rt->hash, ep_name, 1, ep );
829 Given a session id and message type build a key that can be used to look up the rte in the route
830 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
832 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
835 if( sub_id == UNSET_SUBID ) {
836 key = 0xffffffff00000000 | mtype;
838 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);