3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License") ;
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
50 #include <RIC_message_types.h> // needed for route manager messages
56 Passed to a symtab foreach callback to construct a list of pointers from
59 typedef struct thing_list {
60 int error; // if a realloc failed, this will be set
67 // ---- debugging/testing -------------------------------------------------------------------------
70 Dump some stats for an endpoint in the RT. This is generally called to
71 verify endpoints after a table load/change.
73 This is called by the for-each mechanism of the symtab and the prototype is
74 fixe; we don't really use some of the parms, but have dummy references to
75 keep sonar from complaining.
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
81 if( (ep = (endpoint_t *) thing) == NULL ) {
85 if( (counter = (int *) vcounter) != NULL ) {
88 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
92 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
96 Called to count meid entries in the table. The meid points to an 'owning' endpoint
97 so we can list what we find
99 See note in ep_stats about dummy refs.
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
105 if( (ep = (endpoint_t *) thing) == NULL ) {
109 if( (counter = (int *) vcounter) != NULL ) {
112 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
115 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
119 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120 the 'source' information and is added to each message.
122 See note above about dummy references.
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
128 if( (ep = (endpoint_t *) thing) == NULL ) {
129 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
133 if( (id = (char *) vid) == NULL ) {
137 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138 (long long) time( NULL ),
142 ep->scounts[EPSC_GOOD],
143 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144 ep->scounts[EPSC_FAIL],
145 ep->scounts[EPSC_TRANS] );
149 Dump stats for a route entry in the table.
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
153 rtable_ent_t const* rte; // thing is really an rte
157 if( (rte = (rtable_ent_t *) thing) == NULL ) {
158 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
162 if( (counter = (int *) vcounter) != NULL ) {
166 mtype = rte->key & 0xffff;
167 sid = (int) (rte->key >> 32);
169 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
173 Given a route table, cause some stats to be spit out.
175 static void rt_stats( route_table_t* rt ) {
179 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
183 counter = (int *) malloc( sizeof( int ) );
185 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187 rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
188 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
190 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
192 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
193 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
195 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
197 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
198 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
204 Given a route table, cause endpoint counters to be written to stderr. The id
205 parm is written as the "source" in the output.
207 static void rt_epcounts( route_table_t* rt, char* id ) {
209 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
213 rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table
217 static void dump_tables( uta_ctx_t *ctx ) {
218 if( ctx->old_rtable != NULL ) {
219 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220 rt_stats( ctx->old_rtable );
222 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
224 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225 rt_stats( ctx->rtable );
228 // ------------ route manager communication -------------------------------------------------
230 Send a request for a table update to the route manager. Updates come in
231 async, so send and go.
233 pctx is the private context for the thread; ctx is the application context
234 that we need to be able to send the application ID in case rt mgr needs to
235 use it to idenfity us.
237 Returns 0 if we were not able to send a request.
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
243 if( ctx->rtg_whid < 0 ) {
247 smsg = rmr_alloc_msg( pctx, 1024 );
249 smsg->mtype = RMRRM_REQ_TABLE;
251 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253 smsg->len = strlen( smsg->payload ) + 1;
255 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256 if( (state = smsg->state) != RMR_OK ) {
257 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
262 rmr_free_msg( smsg );
269 Send an ack to the route table manager for a table ID that we are
270 processing. State is 1 for OK, and 0 for failed. Reason might
271 be populated if we know why there was a failure.
273 Context should be the PRIVATE context that we use for messages
274 to route manger and NOT the user's context.
276 If a message buffere is passed we use that and use return to sender
277 assuming that this might be a response to a call and that is needed
278 to send back to the proper calling thread. If msg is nil, we allocate
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
283 int payload_size = 1024;
285 if( ctx == NULL || ctx->rtg_whid < 0 ) {
289 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
294 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
297 smsg = rmr_alloc_msg( ctx, payload_size );
301 smsg->mtype = RMRRM_TABLE_STATE;
303 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304 table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
306 smsg->len = strlen( smsg->payload ) + 1;
308 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
310 smsg = rmr_rts_msg( ctx, smsg );
312 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
314 if( (state = smsg->state) != RMR_OK ) {
315 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
321 rmr_free_msg( smsg ); // if not our message we must free the leftovers
326 // ---- utility -----------------------------------------------------------------------------------
328 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
329 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
331 static char* clip( char* buf ) {
334 while( *buf && isspace( *buf ) ) { // skip leading whitespace
338 if( (tok = strchr( buf, '#' )) != NULL ) {
340 return buf; // just push back; leading comment sym handled there
343 if( isspace( *(tok-1) ) ) {
348 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
355 This accepts a pointer to a nil terminated string, and ensures that there is a
356 newline as the last character. If there is not, a new buffer is allocated and
357 the newline is added. If a new buffer is allocated, the buffer passed in is
358 freed. The function returns a pointer which the caller should use, and must
359 free. In the event of an error, a nil pointer is returned.
361 static char* ensure_nlterm( char* buf ) {
369 nb = buf; // default to returning original as is
376 if( *buf != '\n' ) { // not a newline; realloc
377 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
378 nb = strdup( " \n" );
385 if( buf[len-1] != '\n' ) { // not newline terminated, realloc
386 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
387 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
388 memcpy( nb, buf, len );
389 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
401 Roll the new table into the active and the active into the old table. We
402 must have the lock on the active table to do this. It's possible that there
403 is no active table (first load), so we have to account for that (no locking).
405 static void roll_tables( uta_ctx_t* ctx ) {
407 if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
408 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
409 ctx->old_rtable = ctx->new_rtable;
410 ctx->new_rtable = NULL;
414 if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
415 pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
416 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
417 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
418 pthread_mutex_unlock( ctx->rtgate );
420 ctx->old_rtable = NULL; // ensure there isn't an old reference
421 ctx->rtable = ctx->new_rtable; // make new the active one
424 ctx->new_rtable = NULL;
428 Given a thing list, extend the array of pointers by 1/2 of the current
429 number allocated. If we cannot realloc an array, then we set the error
430 flag. Unlikely, but will prevent a crash, AND will prevent us from
431 trying to use the table since we couldn't capture everything.
433 static void extend_things( thing_list_t* tl ) {
442 old_alloc = tl->nalloc; // capture current things
443 old_things = tl->things;
444 old_names = tl->names;
446 tl->nalloc += tl->nalloc/2; // new allocation size
448 tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays
449 tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
451 if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error
456 memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
457 memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
463 // ------------ entry update functions ---------------------------------------------------------------
465 Given a message type create a route table entry and add to the hash keyed on the
466 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
467 is the number of group slots to allocate in the entry.
469 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
471 rtable_ent_t* old_rte; // entry which was already in the table for the key
477 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
478 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
481 memset( rte, 0, sizeof( *rte ) );
485 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
490 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
494 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
496 rte->rrgroups = NULL;
499 rte->nrrgroups = nrrgroups;
501 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
502 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
505 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
507 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
512 This accepts partially parsed information from an rte or mse record sent by route manager or read from
514 ts_field is the msg-type,sender field
515 subid is the integer subscription id
516 rr_field is the endpoint information for round robening message over
518 If all goes well, this will add an RTE to the table under construction.
520 The ts_field is checked to see if we should ingest this record. We ingest if one of
522 there is no sender info (a generic entry for all)
523 there is sender and our host:port matches one of the senders
524 the sender info is an IP address that matches one of our IP addresses
526 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
527 rtable_ent_t* rte; // route table entry added
530 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
534 int ngtoks; // number of tokens in the group list
535 int grp; // index into group list
536 int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
537 int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
539 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
540 rr_field = clip( rr_field );
542 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
543 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
544 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
546 key = build_rt_key( subid, atoi( ts_field ) );
548 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
550 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
551 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
552 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
554 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
555 rte->mtype = atoi( ts_field ); // capture mtype for debugging
557 for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
558 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
559 for( i = 0; i < ntoks; i++ ) {
560 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
561 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
562 uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
567 cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
574 if( DEBUG || (vlevel > 2) ) {
575 rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
581 Trash_entry takes a partially parsed record from the input and
582 will delete the entry if the sender,mtype matches us or it's a
583 generic mtype. The refernce in the new table is removed and the
584 refcounter for the actual rte is decreased. If that ref count is
585 0 then the memory is freed (handled byh the del_rte call).
587 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
588 rtable_ent_t* rte; // route table entry to be 'deleted'
591 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
594 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
598 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
600 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
601 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
602 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
604 key = build_rt_key( subid, atoi( ts_field ) );
605 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
607 if( DEBUG || (vlevel > 1) ) {
608 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
610 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
611 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
613 if( DEBUG || (vlevel > 1) ) {
614 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
618 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
622 // -------------------------- parse functions --------------------------------------------------
625 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
626 the 'owner' which should be the dns name or IP address of an enpoint
627 the meid_list is a space separated list of me IDs
629 This function assumes the caller has vetted the pointers as needed.
631 For each meid in the list, an entry is pushed into the hash which references the owner
632 endpoint such that when the meid is used to route a message it references the endpoint
635 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
641 endpoint_t* ep; // endpoint struct for the owner
643 owner = clip( owner ); // ditch extra whitespace and trailing comments
644 meid_list = clip( meid_list );
646 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
647 for( i = 0; i < ntoks; i++ ) {
648 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
649 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
650 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
652 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
658 Given the tokens from an mme_del, delete the listed meid entries from the new
659 table. The list is a space separated list of meids.
661 The meids in the hash reference endpoints which are never deleted and so
662 the only thing that we need to do here is to remove the meid from the hash.
664 This function assumes the caller has vetted the pointers as needed.
666 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
672 if( rtab->hash == NULL ) {
676 meid_list = clip( meid_list );
678 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
679 for( i = 0; i < ntoks; i++ ) {
680 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
681 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
686 Parse a partially parsed meid record. Tokens[0] should be one of:
687 meid_map, mme_ar, mme_del.
689 pctx is the private context needed to return an ack/nack using the provided
690 message buffer with the route managers address info.
692 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
695 if( tokens == NULL || ntoks < 1 ) {
696 return; // silent but should never happen
699 if( ntoks < 2 ) { // must have at least two for any valid request record
700 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
704 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
705 tokens[1] = clip( tokens[1] );
706 if( *(tokens[1]) == 's' ) {
707 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
708 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
709 uta_rt_drop( ctx->new_rtable );
710 ctx->new_rtable = NULL;
711 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
714 if( ctx->table_id != NULL ) {
715 free( ctx->table_id );
718 ctx->table_id = strdup( clip( tokens[2] ) );
720 ctx->table_id = NULL;
723 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
724 ctx->new_rtable->mupdates = 0;
726 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
728 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
729 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
730 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
731 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
732 ctx->new_rtable->mupdates, tokens[2] );
733 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
734 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
735 uta_rt_drop( ctx->new_rtable );
736 ctx->new_rtable = NULL;
740 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
743 if( ctx->new_rtable ) {
744 roll_tables( ctx ); // roll active to old, and new to active with proper locking
745 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
746 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
749 if( ctx->old_rtable != NULL ) {
750 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
751 rt_stats( ctx->old_rtable );
753 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
755 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
756 rt_stats( ctx->rtable );
759 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
760 ctx->new_rtable = NULL;
768 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
769 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
773 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
774 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
775 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
778 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
779 ctx->new_rtable->mupdates++;
783 if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
784 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
785 ctx->new_rtable->mupdates++;
791 Parse a single record recevied from the route table generator, or read
792 from a static route table file. Start records cause a new table to
793 be started (if a partial table was received it is discarded. Table
794 entry records are added to the currenly 'in progress' table, and an
795 end record causes the in progress table to be finalised and the
796 currently active table is replaced.
798 The updated table will be activated when the *|end record is encountered.
799 However, to allow for a "double" update, where both the meid map and the
800 route table must be updated at the same time, the end indication on a
801 route table (new or update) may specifiy "hold" which indicates that meid
802 map entries are to follow and the updated route table should be held as
803 pending until the end of the meid map is received and validated.
805 CAUTION: we are assuming that there is a single route/meid map generator
806 and as such only one type of update is received at a time; in other
807 words, the sender cannot mix update records and if there is more than
808 one sender process they must synchronise to avoid issues.
811 For a RT update, we expect:
812 newrt | start | <table-id>
813 newrt | end | <count>
814 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
815 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
816 mse| <mtype>[,sender] | <sub-id> | %meid
819 For a meid map update we expect:
820 meid_map | start | <table-id>
821 meid_map | end | <count> | <md5-hash>
822 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
823 mme_del | <meid0> <meid1>...<meidn>
826 The pctx is our private context that must be used to send acks/status
827 messages back to the route manager. The regular ctx is the ctx that
828 the user has been given and thus that's where we have to hang the route
829 table we're working with.
831 If mbuf is given, and we need to ack, then we ack using the mbuf and a
832 return to sender call (allows route manager to use wh_call() to send
833 an update and rts is required to get that back to the right thread).
834 If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
837 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
839 int ntoks; // number of tokens found in something
841 int grp; // group number
842 rtable_ent_t const* rte; // route table entry added
844 char* tok; // pointer into a token or string
851 while( *buf && isspace( *buf ) ) { // skip leading whitespace
854 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
857 memset( tokens, 0, sizeof( tokens ) );
858 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
859 tokens[0] = clip( tokens[0] );
860 switch( *(tokens[0]) ) {
861 case 0: // ignore blanks
863 case '#': // and comment lines
866 case 'd': // del | [sender,]mtype | sub-id
867 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
872 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
876 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
877 ctx->new_rtable->updates++;
880 case 'n': // newrt|{start|end}
881 tokens[1] = clip( tokens[1] );
882 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
884 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
885 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
886 ctx->new_rtable->updates, tokens[2] );
887 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
888 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
889 uta_rt_drop( ctx->new_rtable );
890 ctx->new_rtable = NULL;
895 if( ctx->new_rtable ) {
896 roll_tables( ctx ); // roll active to old, and new to active with proper locking
897 if( DEBUG > 1 || (vlevel > 1) ) {
898 rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
902 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
903 ctx->rtable_ready = 1; // route based sends can now happen
905 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
906 ctx->new_rtable = NULL;
908 } else { // start a new table.
909 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
910 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
912 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
913 uta_rt_drop( ctx->new_rtable );
914 ctx->new_rtable = NULL;
917 if( ctx->table_id != NULL ) {
918 free( ctx->table_id );
921 ctx->table_id = strdup( clip( tokens[2] ) );
923 ctx->table_id = NULL;
926 ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
927 ctx->new_rtable->updates = 0; // init count of entries received
929 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
933 case 'm': // mse entry or one of the meid_ records
934 if( strcmp( tokens[0], "mse" ) == 0 ) {
935 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
940 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
944 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
945 ctx->new_rtable->updates++;
947 meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
951 case 'r': // assume rt entry
952 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
956 ctx->new_rtable->updates++;
957 if( ntoks > 3 ) { // assume new entry with subid last
958 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
960 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
964 case 'u': // update current table, not a total replacement
965 tokens[1] = clip( tokens[1] );
966 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
967 if( ctx->new_rtable == NULL ) { // update table not in progress
972 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
973 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
974 ctx->new_rtable->updates, tokens[2] );
975 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
976 uta_rt_drop( ctx->new_rtable );
977 ctx->new_rtable = NULL;
982 if( ctx->new_rtable ) {
983 roll_tables( ctx ); // roll active to old, and new to active with proper locking
984 if( DEBUG > 1 || (vlevel > 1) ) {
985 rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
989 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
990 ctx->rtable_ready = 1; // route based sends can now happen
992 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
993 ctx->new_rtable = NULL;
995 } else { // start a new table.
996 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
997 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
998 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
999 uta_rt_drop( ctx->new_rtable );
1000 ctx->new_rtable = NULL;
1004 if( ctx->table_id != NULL ) {
1005 free( ctx->table_id );
1007 ctx->table_id = strdup( clip( tokens[2] ) );
1010 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
1011 ctx->new_rtable->updates = 0; // init count of updates received
1013 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1018 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1025 This function attempts to open a static route table in order to create a 'seed'
1026 table during initialisation. The environment variable RMR_SEED_RT is expected
1027 to contain the necessary path to the file. If missing, or if the file is empty,
1028 no route table will be available until one is received from the generator.
1030 This function is probably most useful for testing situations, or extreme
1031 cases where the routes are static.
1033 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1036 char* fbuf; // buffer with file contents
1037 char* rec; // start of the record
1038 char* eor; // end of the record
1039 int rcount = 0; // record count for debug
1041 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1045 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
1046 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1050 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1051 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
1052 if( *eor == '\r' ) {
1053 *eor = '\n'; // will look like a blank line which is ok
1058 while( rec && *rec ) {
1060 if( (eor = strchr( rec, '\n' )) != NULL ) {
1063 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1064 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1069 parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
1074 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
1079 Callback driven for each thing in a symtab. We collect the pointers to those
1080 things for later use (cloning).
1082 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1085 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1089 if( thing == NULL ) {
1090 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1094 tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1095 tl->things[tl->nused++] = thing; // save a reference to the thing
1097 if( tl->nused >= tl->nalloc ) {
1098 extend_things( tl ); // not enough allocated
1103 Called to delete a route table entry struct. We delete the array of endpoint
1104 pointers, but NOT the endpoints referenced as those are referenced from
1107 Route table entries can be concurrently referenced by multiple symtabs, so
1108 the actual delete happens only if decrementing the rte's ref count takes it
1109 to 0. Thus, it is safe to call this function across a symtab when cleaning up
1110 the symtab, or overlaying an entry.
1112 This function uses ONLY the pointer to the rte (thing) and ignores the other
1113 information that symtab foreach function passes (st, entry, and data) which
1114 means that it _can_ safetly be used outside of the foreach setting. If
1115 the function is changed to depend on any of these three, then a stand-alone
1116 rte_cleanup() function should be added and referenced by this, and refererences
1117 to this outside of the foreach world should be changed.
1119 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1123 if( (rte = (rtable_ent_t *) thing) == NULL ) {
1124 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1129 if( rte->refs > 0 ) { // something still referencing, so it lives
1133 if( rte->rrgroups ) { // clean up the round robin groups
1134 for( i = 0; i < rte->nrrgroups; i++ ) {
1135 if( rte->rrgroups[i] ) {
1136 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
1137 free( rte->rrgroups[i] ); // but must free the rrg itself too
1142 free( rte->rrgroups );
1145 free( rte ); // finally, drop the potato
1149 Read an entire file into a buffer. We assume for route table files
1150 they will be smallish and so this won't be a problem.
1151 Returns a pointer to the buffer, or nil. Caller must free.
1152 Terminates the buffer with a nil character for string processing.
1154 If we cannot stat the file, we assume it's empty or missing and return
1155 an empty buffer, as opposed to a nil, so the caller can generate defaults
1156 or error if an empty/missing file isn't tolerated.
1158 static char* uta_fib( char const* fname ) {
1160 off_t fsize = 8192; // size of the file
1161 off_t nread; // number of bytes read
1163 char* buf; // input buffer
1165 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1166 if( fstat( fd, &stats ) >= 0 ) {
1167 if( stats.st_size <= 0 ) { // empty file
1171 fsize = stats.st_size; // stat ok, save the file size
1174 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1178 if( fd < 0 ) { // didn't open or empty
1179 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1187 // add a size limit check here
1189 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1195 nread = read( fd, buf, fsize );
1196 if( nread < 0 || nread > fsize ) { // failure of some kind
1198 errno = EFBIG; // likely too much to handle
1209 // --------------------- initialisation/creation ---------------------------------------------
1211 Create and initialise a route table; Returns a pointer to the table struct.
1213 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1219 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1223 memset( rt, 0, sizeof( *rt ) );
1225 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1230 rt->gate = ctx->rtgate; // single mutex needed for all route tables
1231 rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
1232 pthread_mutex_init( rt->gate, NULL );
1238 Clones one of the spaces in the given table.
1239 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1240 Space is the space in the old table to copy. Space 0 uses an integer key and
1241 references rte structs. All other spaces use a string key and reference endpoints.
1243 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1244 endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
1245 rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
1246 void* sst; // source symtab
1247 void* nst; // new symtab
1248 thing_list_t things; // things from the space to copy
1250 int free_on_err = 0;
1255 if( nrt == NULL ) { // make a new table if needed
1257 nrt = uta_rt_init( ctx );
1263 if( srt == NULL ) { // source was nil, just give back the new table
1267 things.nalloc = 2048;
1270 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1271 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1272 if( things.things == NULL || things.names == NULL ){
1273 if( things.things != NULL ) {
1274 free( things.things );
1276 if( things.names != NULL ) {
1277 free( things.names );
1281 rmr_sym_free( nrt->hash );
1290 memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1291 memset( things.names, 0, sizeof( char * ) * things.nalloc );
1293 sst = srt->hash; // convenience pointers (src symtab)
1296 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1297 if( things.error ) { // something happened and capture failed
1298 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1299 free( things.things );
1300 free( things.names );
1302 rmr_sym_free( nrt->hash );
1311 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1312 for( i = 0; i < things.nused; i++ ) {
1313 if( space ) { // string key, epoint reference
1314 ep = (endpoint_t *) things.things[i];
1315 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1317 rte = (rtable_ent_t *) things.things[i];
1318 rte->refs++; // rtes can be removed, so we track references
1319 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1323 free( things.things );
1324 free( (void *) things.names );
1329 Given a destination route table (drt), clone from the source (srt) into it.
1330 If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1331 allocate the drt if that was nil too). If all is true (1), then we will clone both
1332 the MT and the ME spaces; otherwise only the ME space is cloned.
1334 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1335 endpoint_t* ep; // an endpoint
1336 rtable_ent_t* rte; // a route table entry
1343 drt = uta_rt_init( ctx );
1349 drt->ephash = ctx->ephash; // all rts reference the same EP symtab
1350 rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1352 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1359 Prepares the "new" route table for populating. If the old_rtable is not nil, then
1360 we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1361 context to be referenced by the new pointer; the old pointer is set to nil.
1363 If the old table doesn't exist, then a new table is created and the new pointer is
1364 set to reference it.
1366 The ME namespace references endpoints which do not need to be released, therefore we
1367 do not need to run that portion of the table to deref like we do for the RTEs.
1369 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1377 if( (rt = ctx->old_rtable) != NULL ) {
1378 ctx->old_rtable = NULL;
1379 while( rt->ref_count > 0 ) { // wait for all who are using to stop
1380 if( counter++ > 1000 ) {
1381 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
1385 usleep( 1000 ); // small sleep to yield the processer if that is needed
1388 if( rt->hash != NULL ) {
1389 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
1390 rmr_sym_clear( rt->hash ); // clear all entries from the old table
1393 rt->error = 0; // table with errors can be here, so endure clear before attempt to load
1398 rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
1399 if( rt != NULL ) { // very small chance for nil, but not zero, so test
1400 rt->ref_count = 0; // take no chances; ensure it's 0!
1402 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1403 rt = uta_rt_init( ctx ); // must hav something, but mark it in error state
1412 Given a name, find the endpoint struct in the provided route table.
1414 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1416 if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1420 return rmr_sym_get( rt->ephash, ep_name, 1 );
1424 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1425 Does NOT destroy the gate as it's a common gate for ALL route tables.
1427 static void uta_rt_drop( route_table_t* rt ) {
1432 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1433 rmr_sym_free( rt->hash ); // free all of the hash related data
1438 Look up and return the pointer to the endpoint stuct matching the given name.
1439 If not in the hash, a new endpoint is created, added to the hash. Should always
1442 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1445 if( !rt || !ep_name || ! *ep_name ) {
1446 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1451 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1452 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1453 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1458 ep->notify = 1; // show notification on first connection failure
1459 ep->open = 0; // not connected
1460 ep->addr = uta_h2ip( ep_name );
1461 ep->name = strdup( ep_name );
1462 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1463 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1465 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1473 Given a session id and message type build a key that can be used to look up the rte in the route
1474 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1476 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1479 if( sub_id == UNSET_SUBID ) {
1480 key = 0xffffffff00000000 | mtype;
1482 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1489 Given a route table and meid string, find the owner (if known). Returns a pointer to
1490 the endpoint struct or nil.
1492 static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
1493 endpoint_t const* ep; // the ep we found in the hash
1495 if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1499 return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1503 This returns a pointer to the currently active route table and ups
1504 the reference count so that the route table is not freed while it
1505 is being used. The caller MUST call release_rt() when finished
1508 Care must be taken: the ctx->rtable pointer _could_ change during the time
1509 between the release of the lock and the return. Therefore we MUST grab
1510 the current pointer when we have the lock so that if it does we don't
1511 return a pointer to the wrong table.
1513 This will return NULL if there is no active table.
1515 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1516 route_table_t* rrt; // return value
1518 if( ctx == NULL || ctx->rtable == NULL ) {
1522 pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
1523 rrt = ctx->rtable; // must stash the pointer while we hold lock
1525 pthread_mutex_unlock( ctx->rtgate );
1527 return rrt; // pointer we upped the count with
1531 This will "release" the route table by reducing the use counter
1532 in the table. The table may not be freed until the counter reaches
1533 0, so it's imparative that the pointer be "released" when it is
1534 fetched by get_rt(). Once the caller has released the table it
1535 may not safely use the pointer that it had.
1537 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1538 if( ctx == NULL || rt == NULL ) {
1542 pthread_mutex_lock( ctx->rtgate ); // must hold lock
1543 if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
1546 pthread_mutex_unlock( ctx->rtgate );