3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License") ;
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
49 #include <immintrin.h>
52 #include <RIC_message_types.h> // needed for route manager messages
58 Passed to a symtab foreach callback to construct a list of pointers from
61 typedef struct thing_list {
62 int error; // if a realloc failed, this will be set
69 // ---- debugging/testing -------------------------------------------------------------------------
72 Dump some stats for an endpoint in the RT. This is generally called to
73 verify endpoints after a table load/change.
75 This is called by the for-each mechanism of the symtab and the prototype is
76 fixe; we don't really use some of the parms, but have dummy references to
77 keep sonar from complaining.
79 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
83 if( (ep = (endpoint_t *) thing) == NULL ) {
87 if( (counter = (int *) vcounter) != NULL ) {
90 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
94 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
98 Called to count meid entries in the table. The meid points to an 'owning' endpoint
99 so we can list what we find
101 See note in ep_stats about dummy refs.
103 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
107 if( (ep = (endpoint_t *) thing) == NULL ) {
111 if( (counter = (int *) vcounter) != NULL ) {
114 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
117 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
121 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
122 the 'source' information and is added to each message.
124 See note above about dummy references.
126 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
130 if( (ep = (endpoint_t *) thing) == NULL ) {
131 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
135 if( (id = (char *) vid) == NULL ) {
139 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
140 (long long) time( NULL ),
144 ep->scounts[EPSC_GOOD],
145 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
146 ep->scounts[EPSC_FAIL],
147 ep->scounts[EPSC_TRANS] );
151 Dump stats for a route entry in the table.
153 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
155 rtable_ent_t const* rte; // thing is really an rte
159 if( (rte = (rtable_ent_t *) thing) == NULL ) {
160 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
164 if( (counter = (int *) vcounter) != NULL ) {
168 mtype = rte->key & 0xffff;
169 sid = (int) (rte->key >> 32);
171 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
175 Given a route table, cause some stats to be spit out.
177 static void rt_stats( route_table_t* rt ) {
181 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
185 counter = (int *) malloc( sizeof( int ) );
187 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
188 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
189 rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
190 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
192 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
194 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
195 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
197 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
199 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
200 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
206 Given a route table, cause endpoint counters to be written to stderr. The id
207 parm is written as the "source" in the output.
209 static void rt_epcounts( route_table_t* rt, char* id ) {
211 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
215 rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table
219 static void dump_tables( uta_ctx_t *ctx ) {
220 if( ctx->old_rtable != NULL ) {
221 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
222 rt_stats( ctx->old_rtable );
224 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
226 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
227 rt_stats( ctx->rtable );
230 // ------------ route manager communication -------------------------------------------------
232 Send a request for a table update to the route manager. Updates come in
233 async, so send and go.
235 pctx is the private context for the thread; ctx is the application context
236 that we need to be able to send the application ID in case rt mgr needs to
237 use it to idenfity us.
239 Returns 0 if we were not able to send a request.
241 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
245 if( ctx->rtg_whid < 0 ) {
249 smsg = rmr_alloc_msg( pctx, 1024 );
251 smsg->mtype = RMRRM_REQ_TABLE;
253 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
254 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
255 smsg->len = strlen( smsg->payload ) + 1;
257 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
258 if( (state = smsg->state) != RMR_OK ) {
259 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
260 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
264 rmr_free_msg( smsg );
271 Send an ack to the route table manager for a table ID that we are
272 processing. State is 1 for OK, and 0 for failed. Reason might
273 be populated if we know why there was a failure.
275 Context should be the PRIVATE context that we use for messages
276 to route manger and NOT the user's context.
278 If a message buffere is passed we use that and use return to sender
279 assuming that this might be a response to a call and that is needed
280 to send back to the proper calling thread. If msg is nil, we allocate
283 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
285 int payload_size = 1024;
287 if( ctx == NULL || ctx->rtg_whid < 0 ) {
291 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
296 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
299 smsg = rmr_alloc_msg( ctx, payload_size );
303 smsg->mtype = RMRRM_TABLE_STATE;
305 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
306 table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
308 smsg->len = strlen( smsg->payload ) + 1;
310 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
312 smsg = rmr_rts_msg( ctx, smsg );
314 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
316 if( (state = smsg->state) != RMR_OK ) {
317 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
318 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
323 rmr_free_msg( smsg ); // if not our message we must free the leftovers
328 // ---- alarm generation --------------------------------------------------------------------------
331 Given the user's context (not the thread private context) look to see if the application isn't
332 working fast enough and we're dropping messages. If the drop counter has changed since the last
333 peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
334 set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
336 The private context is what we use to send so as not to interfere with the user flow.
338 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
339 static int alarm_raised = 0;
340 static int ok2clear = 0; // time that we can clear
341 static int lastd = 0; // the last counter value so we can compute delta
342 static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes
344 rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
345 if( ! alarm_raised ) {
346 if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing
351 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
352 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
354 if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do
355 lastd = uctx->dcount;
356 ok2clear = 0; // reset the timer
360 if( ok2clear == 0 ) { // first round where not dropping
361 ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s
363 if( time( NULL ) > ok2clear ) { // things still stable after expiry
364 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
366 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
373 // ---- utility -----------------------------------------------------------------------------------
375 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
376 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
378 static char* clip( char* buf ) {
381 while( *buf && isspace( *buf ) ) { // skip leading whitespace
385 if( (tok = strchr( buf, '#' )) != NULL ) {
387 return buf; // just push back; leading comment sym handled there
390 if( isspace( *(tok-1) ) ) {
395 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
402 This accepts a pointer to a nil terminated string, and ensures that there is a
403 newline as the last character. If there is not, a new buffer is allocated and
404 the newline is added. If a new buffer is allocated, the buffer passed in is
405 freed. The function returns a pointer which the caller should use, and must
406 free. In the event of an error, a nil pointer is returned.
408 static char* ensure_nlterm( char* buf ) {
416 nb = buf; // default to returning original as is
423 if( *buf != '\n' ) { // not a newline; realloc
424 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
425 nb = strdup( " \n" );
432 if( buf[len-1] != '\n' ) { // not newline terminated, realloc
433 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
434 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
435 memcpy( nb, buf, len );
436 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
448 Roll the new table into the active and the active into the old table. We
449 must have the lock on the active table to do this. It's possible that there
450 is no active table (first load), so we have to account for that (no locking).
452 static void roll_tables( uta_ctx_t* ctx ) {
454 if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
455 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
456 ctx->old_rtable = ctx->new_rtable;
457 ctx->new_rtable = NULL;
461 if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
462 pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
463 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
464 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
465 pthread_mutex_unlock( ctx->rtgate );
467 ctx->old_rtable = NULL; // ensure there isn't an old reference
468 ctx->rtable = ctx->new_rtable; // make new the active one
471 ctx->new_rtable = NULL;
475 Given a thing list, extend the array of pointers by 1/2 of the current
476 number allocated. If we cannot realloc an array, then we set the error
477 flag. Unlikely, but will prevent a crash, AND will prevent us from
478 trying to use the table since we couldn't capture everything.
480 static void extend_things( thing_list_t* tl ) {
489 old_alloc = tl->nalloc; // capture current things
490 old_things = tl->things;
491 old_names = tl->names;
493 tl->nalloc += tl->nalloc/2; // new allocation size
495 tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays
496 tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
498 if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error
503 memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
504 memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
510 // ------------ entry update functions ---------------------------------------------------------------
512 Given a message type create a route table entry and add to the hash keyed on the
513 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
514 is the number of group slots to allocate in the entry.
516 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
518 rtable_ent_t* old_rte; // entry which was already in the table for the key
524 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
525 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
528 memset( rte, 0, sizeof( *rte ) );
532 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
537 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
541 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
543 rte->rrgroups = NULL;
546 rte->nrrgroups = nrrgroups;
548 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
549 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
552 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
554 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
559 This accepts partially parsed information from an rte or mse record sent by route manager or read from
561 ts_field is the msg-type,sender field
562 subid is the integer subscription id
563 rr_field is the endpoint information for round robening message over
565 If all goes well, this will add an RTE to the table under construction.
567 The ts_field is checked to see if we should ingest this record. We ingest if one of
569 there is no sender info (a generic entry for all)
570 there is sender and our host:port matches one of the senders
571 the sender info is an IP address that matches one of our IP addresses
573 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
574 rtable_ent_t* rte; // route table entry added
577 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
580 int ngtoks; // number of tokens in the group list
581 int grp; // index into group list
582 int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
583 int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
585 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
586 rr_field = clip( rr_field );
588 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
589 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
590 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
592 key = build_rt_key( subid, atoi( ts_field ) );
594 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
596 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
597 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
598 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
600 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
601 rte->mtype = atoi( ts_field ); // capture mtype for debugging
603 for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
604 int i; // avoid sonar grumbling by defining this here
606 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
607 for( i = 0; i < ntoks; i++ ) {
608 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
609 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
610 uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
615 cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
622 if( DEBUG || (vlevel > 2) ) {
623 rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
629 Trash_entry takes a partially parsed record from the input and
630 will delete the entry if the sender,mtype matches us or it's a
631 generic mtype. The refernce in the new table is removed and the
632 refcounter for the actual rte is decreased. If that ref count is
633 0 then the memory is freed (handled byh the del_rte call).
635 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
636 rtable_ent_t* rte; // route table entry to be 'deleted'
639 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
642 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
646 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
648 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
649 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
650 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
652 key = build_rt_key( subid, atoi( ts_field ) );
653 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
655 if( DEBUG || (vlevel > 1) ) {
656 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
658 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
659 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
661 if( DEBUG || (vlevel > 1) ) {
662 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
666 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
670 // -------------------------- parse functions --------------------------------------------------
673 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
674 the 'owner' which should be the dns name or IP address of an enpoint
675 the meid_list is a space separated list of me IDs
677 This function assumes the caller has vetted the pointers as needed.
679 For each meid in the list, an entry is pushed into the hash which references the owner
680 endpoint such that when the meid is used to route a message it references the endpoint
683 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
689 endpoint_t* ep; // endpoint struct for the owner
691 owner = clip( owner ); // ditch extra whitespace and trailing comments
692 meid_list = clip( meid_list );
694 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
695 for( i = 0; i < ntoks; i++ ) {
696 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
697 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
698 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
700 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
706 Given the tokens from an mme_del, delete the listed meid entries from the new
707 table. The list is a space separated list of meids.
709 The meids in the hash reference endpoints which are never deleted and so
710 the only thing that we need to do here is to remove the meid from the hash.
712 This function assumes the caller has vetted the pointers as needed.
714 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
720 if( rtab->hash == NULL ) {
724 meid_list = clip( meid_list );
726 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
727 for( i = 0; i < ntoks; i++ ) {
728 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
729 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
734 Parse a partially parsed meid record. Tokens[0] should be one of:
735 meid_map, mme_ar, mme_del.
737 pctx is the private context needed to return an ack/nack using the provided
738 message buffer with the route managers address info.
740 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
743 if( tokens == NULL || ntoks < 1 ) {
744 return; // silent but should never happen
747 if( ntoks < 2 ) { // must have at least two for any valid request record
748 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
752 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
753 tokens[1] = clip( tokens[1] );
754 if( *(tokens[1]) == 's' ) {
755 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
756 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
757 uta_rt_drop( ctx->new_rtable );
758 ctx->new_rtable = NULL;
759 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
762 if( ctx->table_id != NULL ) {
763 free( ctx->table_id );
766 ctx->table_id = strdup( clip( tokens[2] ) );
768 ctx->table_id = NULL;
771 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
772 ctx->new_rtable->mupdates = 0;
774 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
776 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
777 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
778 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
779 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
780 ctx->new_rtable->mupdates, tokens[2] );
781 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
782 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
783 uta_rt_drop( ctx->new_rtable );
784 ctx->new_rtable = NULL;
788 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
791 if( ctx->new_rtable ) {
792 roll_tables( ctx ); // roll active to old, and new to active with proper locking
793 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
794 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
797 if( ctx->old_rtable != NULL ) {
798 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
799 rt_stats( ctx->old_rtable );
801 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
803 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
804 rt_stats( ctx->rtable );
807 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
808 ctx->new_rtable = NULL;
816 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
817 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
821 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
822 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
823 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
826 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
827 ctx->new_rtable->mupdates++;
831 if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
832 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
833 ctx->new_rtable->mupdates++;
839 This will close the current table snarf file (in *.inc) and open a new one.
840 The curent one is renamed. The final file name is determined by the setting of
841 RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
842 an additional extension of .snarf. If neither seed or snarf environment vars are
843 set then this does nothing.
845 If this is called before the tmp snarf file is opened, then this just opens the file.
847 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
848 static int ok2warn = 0; // some warnings squelched on first call
850 char* seed_fname; // the filename from env
851 char tfname[512]; // temp fname
852 char wfname[512]; // working buffer for filename
853 char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given
859 if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given
860 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file
861 memset( wfname, 0, sizeof( wfname ) );
862 snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
863 snarf_fname = wfname;
867 if( snarf_fname == NULL ) {
868 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
872 memset( tfname, 0, sizeof( tfname ) );
873 snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable
875 if( ctx->snarf_rt_fd >= 0 ) {
876 char* msg= "### captured from route manager\n";
877 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
878 if( close( ctx->snarf_rt_fd ) < 0 ) {
879 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
883 if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it
884 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
887 if( rename( tfname, snarf_fname ) ) {
888 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
890 rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
895 ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
896 if( ctx->snarf_rt_fd < 0 ) {
897 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
899 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
904 Parse a single record recevied from the route table generator, or read
905 from a static route table file. Start records cause a new table to
906 be started (if a partial table was received it is discarded. Table
907 entry records are added to the currenly 'in progress' table, and an
908 end record causes the in progress table to be finalised and the
909 currently active table is replaced.
911 The updated table will be activated when the *|end record is encountered.
912 However, to allow for a "double" update, where both the meid map and the
913 route table must be updated at the same time, the end indication on a
914 route table (new or update) may specifiy "hold" which indicates that meid
915 map entries are to follow and the updated route table should be held as
916 pending until the end of the meid map is received and validated.
918 CAUTION: we are assuming that there is a single route/meid map generator
919 and as such only one type of update is received at a time; in other
920 words, the sender cannot mix update records and if there is more than
921 one sender process they must synchronise to avoid issues.
924 For a RT update, we expect:
925 newrt | start | <table-id>
926 newrt | end | <count>
927 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
928 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
929 mse| <mtype>[,sender] | <sub-id> | %meid
932 For a meid map update we expect:
933 meid_map | start | <table-id>
934 meid_map | end | <count> | <md5-hash>
935 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
936 mme_del | <meid0> <meid1>...<meidn>
939 The pctx is our private context that must be used to send acks/status
940 messages back to the route manager. The regular ctx is the ctx that
941 the user has been given and thus that's where we have to hang the route
942 table we're working with.
944 If mbuf is given, and we need to ack, then we ack using the mbuf and a
945 return to sender call (allows route manager to use wh_call() to send
946 an update and rts is required to get that back to the right thread).
947 If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
950 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
952 int ntoks; // number of tokens found in something
954 int grp; // group number
955 rtable_ent_t const* rte; // route table entry added
957 char* tok=NULL; // pointer into a token or string
964 if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy
965 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
966 write( ctx->snarf_rt_fd, "\n", 1 );
969 while( *buf && isspace( *buf ) ) { // skip leading whitespace
972 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
975 memset( tokens, 0, sizeof( tokens ) );
976 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
977 tokens[0] = clip( tokens[0] );
978 switch( *(tokens[0]) ) {
979 case 0: // ignore blanks
981 case '#': // and comment lines
984 case 'd': // del | [sender,]mtype | sub-id
985 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
990 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
994 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
995 ctx->new_rtable->updates++;
998 case 'n': // newrt|{start|end}
999 tokens[1] = clip( tokens[1] );
1000 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
1001 if( ctx && ctx->snarf_rt_fd >= 0 ) {
1002 cycle_snarfed_rt( ctx ); // make it available and open a new one
1006 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
1007 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1008 ctx->new_rtable->updates, tokens[2] );
1009 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1010 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1011 uta_rt_drop( ctx->new_rtable );
1012 ctx->new_rtable = NULL;
1017 if( ctx->new_rtable ) {
1018 roll_tables( ctx ); // roll active to old, and new to active with proper locking
1019 if( DEBUG > 1 || (vlevel > 1) ) {
1020 rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1024 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1025 ctx->rtable_ready = 1; // route based sends can now happen
1026 ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table
1028 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1029 ctx->new_rtable = NULL;
1031 } else { // start a new table.
1032 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
1033 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
1035 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1036 uta_rt_drop( ctx->new_rtable );
1037 ctx->new_rtable = NULL;
1040 if( ctx->table_id != NULL ) {
1041 free( ctx->table_id );
1044 ctx->table_id = strdup( clip( tokens[2] ) );
1046 ctx->table_id = NULL;
1049 ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
1050 ctx->new_rtable->updates = 0; // init count of entries received
1052 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1056 case 'm': // mse entry or one of the meid_ records
1057 if( strcmp( tokens[0], "mse" ) == 0 ) {
1058 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
1063 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1067 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1068 ctx->new_rtable->updates++;
1070 meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1074 case 'r': // assume rt entry
1075 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
1079 ctx->new_rtable->updates++;
1080 if( ntoks > 3 ) { // assume new entry with subid last
1081 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1083 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
1087 case 'u': // update current table, not a total replacement
1088 if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator
1089 rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1093 tokens[1] = clip( tokens[1] );
1094 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
1095 if( ctx->new_rtable == NULL ) { // update table not in progress
1098 if( ctx->snarf_rt_fd >= 0 ) {
1099 cycle_snarfed_rt( ctx ); // make it available and open a new one
1103 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
1104 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1105 ctx->new_rtable->updates, tokens[2] );
1106 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1107 uta_rt_drop( ctx->new_rtable );
1108 ctx->new_rtable = NULL;
1113 if( ctx->new_rtable ) {
1114 roll_tables( ctx ); // roll active to old, and new to active with proper locking
1115 if( DEBUG > 1 || (vlevel > 1) ) {
1116 rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1120 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1121 ctx->rtable_ready = 1; // route based sends can now happen
1123 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1124 ctx->new_rtable = NULL;
1126 } else { // start a new table.
1127 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
1128 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1129 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
1130 uta_rt_drop( ctx->new_rtable );
1131 ctx->new_rtable = NULL;
1135 if( ctx->table_id != NULL ) {
1136 free( ctx->table_id );
1138 ctx->table_id = strdup( clip( tokens[2] ) );
1141 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
1142 ctx->new_rtable->updates = 0; // init count of updates received
1144 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1149 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1156 This function attempts to open a static route table in order to create a 'seed'
1157 table during initialisation. The environment variable RMR_SEED_RT is expected
1158 to contain the necessary path to the file. If missing, or if the file is empty,
1159 no route table will be available until one is received from the generator.
1161 This function is probably most useful for testing situations, or extreme
1162 cases where the routes are static.
1164 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1167 char* fbuf; // buffer with file contents
1168 char* rec; // start of the record
1169 char* eor; // end of the record
1170 int rcount = 0; // record count for debug
1172 if( (fname = ctx->seed_rt_fname) == NULL ) {
1173 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1177 ctx->seed_rt_fname = strdup( fname );
1178 fname = ctx->seed_rt_fname;
1181 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
1182 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1186 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1187 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
1188 if( *eor == '\r' ) {
1189 *eor = '\n'; // will look like a blank line which is ok
1194 while( rec && *rec ) {
1196 if( (eor = strchr( rec, '\n' )) != NULL ) {
1199 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1200 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1205 parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
1210 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
1215 Callback driven for each thing in a symtab. We collect the pointers to those
1216 things for later use (cloning).
1218 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1221 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1225 if( thing == NULL ) {
1226 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1230 tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1231 tl->things[tl->nused++] = thing; // save a reference to the thing
1233 if( tl->nused >= tl->nalloc ) {
1234 extend_things( tl ); // not enough allocated
1239 Called to delete a route table entry struct. We delete the array of endpoint
1240 pointers, but NOT the endpoints referenced as those are referenced from
1243 Route table entries can be concurrently referenced by multiple symtabs, so
1244 the actual delete happens only if decrementing the rte's ref count takes it
1245 to 0. Thus, it is safe to call this function across a symtab when cleaning up
1246 the symtab, or overlaying an entry.
1248 This function uses ONLY the pointer to the rte (thing) and ignores the other
1249 information that symtab foreach function passes (st, entry, and data) which
1250 means that it _can_ safetly be used outside of the foreach setting. If
1251 the function is changed to depend on any of these three, then a stand-alone
1252 rte_cleanup() function should be added and referenced by this, and refererences
1253 to this outside of the foreach world should be changed.
1255 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1259 if( (rte = (rtable_ent_t *) thing) == NULL ) {
1260 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1265 if( rte->refs > 0 ) { // something still referencing, so it lives
1269 if( rte->rrgroups ) { // clean up the round robin groups
1270 for( i = 0; i < rte->nrrgroups; i++ ) {
1271 if( rte->rrgroups[i] ) {
1272 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
1273 free( rte->rrgroups[i] ); // but must free the rrg itself too
1278 free( rte->rrgroups );
1281 free( rte ); // finally, drop the potato
1285 Read an entire file into a buffer. We assume for route table files
1286 they will be smallish and so this won't be a problem.
1287 Returns a pointer to the buffer, or nil. Caller must free.
1288 Terminates the buffer with a nil character for string processing.
1290 If we cannot stat the file, we assume it's empty or missing and return
1291 an empty buffer, as opposed to a nil, so the caller can generate defaults
1292 or error if an empty/missing file isn't tolerated.
1294 static char* uta_fib( char const* fname ) {
1296 off_t fsize = 8192; // size of the file
1297 off_t nread; // number of bytes read
1299 char* buf; // input buffer
1301 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1302 if( fstat( fd, &stats ) >= 0 ) {
1303 if( stats.st_size <= 0 ) { // empty file
1307 fsize = stats.st_size; // stat ok, save the file size
1310 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1314 if( fd < 0 ) { // didn't open or empty
1315 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1323 // add a size limit check here
1325 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1331 nread = read( fd, buf, fsize );
1332 if( nread < 0 || nread > fsize ) { // failure of some kind
1334 errno = EFBIG; // likely too much to handle
1345 // --------------------- initialisation/creation ---------------------------------------------
1347 Create and initialise a route table; Returns a pointer to the table struct.
1349 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1355 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1359 memset( rt, 0, sizeof( *rt ) );
1361 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1366 rt->gate = ctx->rtgate; // single mutex needed for all route tables
1367 rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
1368 pthread_mutex_init( rt->gate, NULL );
1374 Clones one of the spaces in the given table.
1375 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1376 Space is the space in the old table to copy. Space 0 uses an integer key and
1377 references rte structs. All other spaces use a string key and reference endpoints.
1379 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1380 endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
1381 rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
1382 void* sst; // source symtab
1383 void* nst; // new symtab
1384 thing_list_t things; // things from the space to copy
1386 int free_on_err = 0;
1391 if( nrt == NULL ) { // make a new table if needed
1393 nrt = uta_rt_init( ctx );
1399 if( srt == NULL ) { // source was nil, just give back the new table
1403 things.nalloc = 2048;
1406 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1407 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1408 if( things.things == NULL || things.names == NULL ){
1409 if( things.things != NULL ) {
1410 free( things.things );
1412 if( things.names != NULL ) {
1413 free( things.names );
1417 rmr_sym_free( nrt->hash );
1426 memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1427 memset( things.names, 0, sizeof( char * ) * things.nalloc );
1429 sst = srt->hash; // convenience pointers (src symtab)
1432 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1433 if( things.error ) { // something happened and capture failed
1434 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1435 free( things.things );
1436 free( things.names );
1438 rmr_sym_free( nrt->hash );
1447 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1448 for( i = 0; i < things.nused; i++ ) {
1449 if( space ) { // string key, epoint reference
1450 ep = (endpoint_t *) things.things[i];
1451 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1453 rte = (rtable_ent_t *) things.things[i];
1454 rte->refs++; // rtes can be removed, so we track references
1455 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1459 free( things.things );
1460 free( (void *) things.names );
1465 Given a destination route table (drt), clone from the source (srt) into it.
1466 If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1467 allocate the drt if that was nil too). If all is true (1), then we will clone both
1468 the MT and the ME spaces; otherwise only the ME space is cloned.
1470 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1471 endpoint_t* ep; // an endpoint
1472 rtable_ent_t* rte; // a route table entry
1479 drt = uta_rt_init( ctx );
1485 drt->ephash = ctx->ephash; // all rts reference the same EP symtab
1486 rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1488 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1495 Prepares the "new" route table for populating. If the old_rtable is not nil, then
1496 we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1497 context to be referenced by the new pointer; the old pointer is set to nil.
1499 If the old table doesn't exist, then a new table is created and the new pointer is
1500 set to reference it.
1502 The ME namespace references endpoints which do not need to be released, therefore we
1503 do not need to run that portion of the table to deref like we do for the RTEs.
1505 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1514 if( (rt = ctx->old_rtable) != NULL ) {
1515 ctx->old_rtable = NULL;
1517 pthread_mutex_lock( ctx->rtgate );
1518 ref_count = rt->ref_count;
1519 pthread_mutex_unlock( ctx->rtgate );
1521 while( ref_count > 0 ) { // wait for all who are using to stop
1522 if( counter++ > 1000 ) {
1523 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
1527 usleep( 1000 ); // small sleep to yield the processer if that is needed
1529 pthread_mutex_lock( ctx->rtgate );
1530 ref_count = rt->ref_count;
1531 pthread_mutex_unlock( ctx->rtgate );
1534 if( rt->hash != NULL ) {
1535 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
1536 rmr_sym_clear( rt->hash ); // clear all entries from the old table
1539 rt->error = 0; // table with errors can be here, so endure clear before attempt to load
1544 pthread_mutex_destroy(ctx->rtgate);
1545 rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
1546 if( rt != NULL ) { // very small chance for nil, but not zero, so test
1547 rt->ref_count = 0; // take no chances; ensure it's 0!
1549 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1550 rt = uta_rt_init( ctx ); // must hav something, but mark it in error state
1559 Given a name, find the endpoint struct in the provided route table.
1561 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1563 if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1567 return rmr_sym_get( rt->ephash, ep_name, 1 );
1571 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1572 Does NOT destroy the gate as it's a common gate for ALL route tables.
1574 static void uta_rt_drop( route_table_t* rt ) {
1579 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1580 rmr_sym_free( rt->hash ); // free all of the hash related data
1585 Look up and return the pointer to the endpoint stuct matching the given name.
1586 If not in the hash, a new endpoint is created, added to the hash. Should always
1589 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1592 if( !rt || !ep_name || ! *ep_name ) {
1593 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1598 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1599 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1600 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1605 ep->notify = 1; // show notification on first connection failure
1606 ep->open = 0; // not connected
1607 ep->addr = uta_h2ip( ep_name );
1608 ep->name = strdup( ep_name );
1609 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1610 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1612 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1620 Given a session id and message type build a key that can be used to look up the rte in the route
1621 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1623 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1626 if( sub_id == UNSET_SUBID ) {
1627 key = 0xffffffff00000000 | mtype;
1629 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1636 Given a route table and meid string, find the owner (if known). Returns a pointer to
1637 the endpoint struct or nil.
1639 static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
1640 endpoint_t const* ep; // the ep we found in the hash
1642 if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1646 return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1650 This returns a pointer to the currently active route table and ups
1651 the reference count so that the route table is not freed while it
1652 is being used. The caller MUST call release_rt() when finished
1655 Care must be taken: the ctx->rtable pointer _could_ change during the time
1656 between the release of the lock and the return. Therefore we MUST grab
1657 the current pointer when we have the lock so that if it does we don't
1658 return a pointer to the wrong table.
1660 This will return NULL if there is no active table.
1662 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1663 route_table_t* rrt; // return value
1665 if( ctx == NULL || ctx->rtable == NULL ) {
1669 pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
1670 rrt = ctx->rtable; // must stash the pointer while we hold lock
1672 pthread_mutex_unlock( ctx->rtgate );
1674 return rrt; // pointer we upped the count with
1678 This will "release" the route table by reducing the use counter
1679 in the table. The table may not be freed until the counter reaches
1680 0, so it's imparative that the pointer be "released" when it is
1681 fetched by get_rt(). Once the caller has released the table it
1682 may not safely use the pointer that it had.
1684 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1685 if( ctx == NULL || rt == NULL ) {
1689 pthread_mutex_lock( ctx->rtgate ); // must hold lock
1690 if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
1693 pthread_mutex_unlock( ctx->rtgate );
1696 int isspace_with_fence(int c) {
1698 return isspace( c );