3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License") ;
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
50 #include <RIC_message_types.h> // needed for route manager messages
56 Passed to a symtab foreach callback to construct a list of pointers from
59 typedef struct thing_list {
60 int error; // if a realloc failed, this will be set
67 // ---- debugging/testing -------------------------------------------------------------------------
70 Dump some stats for an endpoint in the RT. This is generally called to
71 verify endpoints after a table load/change.
73 This is called by the for-each mechanism of the symtab and the prototype is
74 fixe; we don't really use some of the parms, but have dummy references to
75 keep sonar from complaining.
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
81 if( (ep = (endpoint_t *) thing) == NULL ) {
85 if( (counter = (int *) vcounter) != NULL ) {
88 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
92 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
96 Called to count meid entries in the table. The meid points to an 'owning' endpoint
97 so we can list what we find
99 See note in ep_stats about dummy refs.
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
105 if( (ep = (endpoint_t *) thing) == NULL ) {
109 if( (counter = (int *) vcounter) != NULL ) {
112 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
115 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
119 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120 the 'source' information and is added to each message.
122 See note above about dummy references.
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
128 if( (ep = (endpoint_t *) thing) == NULL ) {
129 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
133 if( (id = (char *) vid) == NULL ) {
137 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138 (long long) time( NULL ),
142 ep->scounts[EPSC_GOOD],
143 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144 ep->scounts[EPSC_FAIL],
145 ep->scounts[EPSC_TRANS] );
149 Dump stats for a route entry in the table.
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
153 rtable_ent_t const* rte; // thing is really an rte
157 if( (rte = (rtable_ent_t *) thing) == NULL ) {
158 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
162 if( (counter = (int *) vcounter) != NULL ) {
166 mtype = rte->key & 0xffff;
167 sid = (int) (rte->key >> 32);
169 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
173 Given a route table, cause some stats to be spit out.
175 static void rt_stats( route_table_t* rt ) {
179 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
183 counter = (int *) malloc( sizeof( int ) );
185 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187 rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
188 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
190 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
192 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
193 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
195 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
197 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
198 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
204 Given a route table, cause endpoint counters to be written to stderr. The id
205 parm is written as the "source" in the output.
207 static void rt_epcounts( route_table_t* rt, char* id ) {
209 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
213 rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table
217 static void dump_tables( uta_ctx_t *ctx ) {
218 if( ctx->old_rtable != NULL ) {
219 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220 rt_stats( ctx->old_rtable );
222 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
224 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225 rt_stats( ctx->rtable );
228 // ------------ route manager communication -------------------------------------------------
230 Send a request for a table update to the route manager. Updates come in
231 async, so send and go.
233 pctx is the private context for the thread; ctx is the application context
234 that we need to be able to send the application ID in case rt mgr needs to
235 use it to idenfity us.
237 Returns 0 if we were not able to send a request.
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
243 if( ctx->rtg_whid < 0 ) {
247 smsg = rmr_alloc_msg( pctx, 1024 );
249 smsg->mtype = RMRRM_REQ_TABLE;
251 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253 smsg->len = strlen( smsg->payload ) + 1;
255 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256 if( (state = smsg->state) != RMR_OK ) {
257 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
262 rmr_free_msg( smsg );
269 Send an ack to the route table manager for a table ID that we are
270 processing. State is 1 for OK, and 0 for failed. Reason might
271 be populated if we know why there was a failure.
273 Context should be the PRIVATE context that we use for messages
274 to route manger and NOT the user's context.
276 If a message buffere is passed we use that and use return to sender
277 assuming that this might be a response to a call and that is needed
278 to send back to the proper calling thread. If msg is nil, we allocate
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
283 int payload_size = 1024;
285 if( ctx == NULL || ctx->rtg_whid < 0 ) {
289 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
294 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
297 smsg = rmr_alloc_msg( ctx, payload_size );
301 smsg->mtype = RMRRM_TABLE_STATE;
303 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304 table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
306 smsg->len = strlen( smsg->payload ) + 1;
308 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
310 smsg = rmr_rts_msg( ctx, smsg );
312 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
314 if( (state = smsg->state) != RMR_OK ) {
315 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
321 rmr_free_msg( smsg ); // if not our message we must free the leftovers
326 // ---- alarm generation --------------------------------------------------------------------------
329 Given the user's context (not the thread private context) look to see if the application isn't
330 working fast enough and we're dropping messages. If the drop counter has changed since the last
331 peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
332 set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
334 The private context is what we use to send so as not to interfere with the user flow.
336 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
337 static int alarm_raised = 0;
338 static int ok2clear = 0; // time that we can clear
339 static int lastd = 0; // the last counter value so we can compute delta
340 static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes
342 rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
343 if( ! alarm_raised ) {
344 if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing
349 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
350 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
352 if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do
353 lastd = uctx->dcount;
354 ok2clear = 0; // reset the timer
358 if( ok2clear == 0 ) { // first round where not dropping
359 ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s
361 if( time( NULL ) > ok2clear ) { // things still stable after expiry
362 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
364 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
371 // ---- utility -----------------------------------------------------------------------------------
373 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
374 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
376 static char* clip( char* buf ) {
379 while( *buf && isspace( *buf ) ) { // skip leading whitespace
383 if( (tok = strchr( buf, '#' )) != NULL ) {
385 return buf; // just push back; leading comment sym handled there
388 if( isspace( *(tok-1) ) ) {
393 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
400 This accepts a pointer to a nil terminated string, and ensures that there is a
401 newline as the last character. If there is not, a new buffer is allocated and
402 the newline is added. If a new buffer is allocated, the buffer passed in is
403 freed. The function returns a pointer which the caller should use, and must
404 free. In the event of an error, a nil pointer is returned.
406 static char* ensure_nlterm( char* buf ) {
414 nb = buf; // default to returning original as is
421 if( *buf != '\n' ) { // not a newline; realloc
422 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
423 nb = strdup( " \n" );
430 if( buf[len-1] != '\n' ) { // not newline terminated, realloc
431 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
432 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
433 memcpy( nb, buf, len );
434 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
446 Roll the new table into the active and the active into the old table. We
447 must have the lock on the active table to do this. It's possible that there
448 is no active table (first load), so we have to account for that (no locking).
450 static void roll_tables( uta_ctx_t* ctx ) {
452 if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
453 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
454 ctx->old_rtable = ctx->new_rtable;
455 ctx->new_rtable = NULL;
459 if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
460 pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
461 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
462 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
463 pthread_mutex_unlock( ctx->rtgate );
465 ctx->old_rtable = NULL; // ensure there isn't an old reference
466 ctx->rtable = ctx->new_rtable; // make new the active one
469 ctx->new_rtable = NULL;
473 Given a thing list, extend the array of pointers by 1/2 of the current
474 number allocated. If we cannot realloc an array, then we set the error
475 flag. Unlikely, but will prevent a crash, AND will prevent us from
476 trying to use the table since we couldn't capture everything.
478 static void extend_things( thing_list_t* tl ) {
487 old_alloc = tl->nalloc; // capture current things
488 old_things = tl->things;
489 old_names = tl->names;
491 tl->nalloc += tl->nalloc/2; // new allocation size
493 tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays
494 tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
496 if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error
501 memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
502 memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
508 // ------------ entry update functions ---------------------------------------------------------------
510 Given a message type create a route table entry and add to the hash keyed on the
511 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
512 is the number of group slots to allocate in the entry.
514 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
516 rtable_ent_t* old_rte; // entry which was already in the table for the key
522 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
523 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
526 memset( rte, 0, sizeof( *rte ) );
530 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
535 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
539 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
541 rte->rrgroups = NULL;
544 rte->nrrgroups = nrrgroups;
546 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
547 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
550 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
552 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
557 This accepts partially parsed information from an rte or mse record sent by route manager or read from
559 ts_field is the msg-type,sender field
560 subid is the integer subscription id
561 rr_field is the endpoint information for round robening message over
563 If all goes well, this will add an RTE to the table under construction.
565 The ts_field is checked to see if we should ingest this record. We ingest if one of
567 there is no sender info (a generic entry for all)
568 there is sender and our host:port matches one of the senders
569 the sender info is an IP address that matches one of our IP addresses
571 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
572 rtable_ent_t* rte; // route table entry added
575 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
578 int ngtoks; // number of tokens in the group list
579 int grp; // index into group list
580 int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
581 int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
583 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
584 rr_field = clip( rr_field );
586 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
587 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
588 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
590 key = build_rt_key( subid, atoi( ts_field ) );
592 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
594 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
595 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
596 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
598 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
599 rte->mtype = atoi( ts_field ); // capture mtype for debugging
601 for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
602 int i; // avoid sonar grumbling by defining this here
604 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
605 for( i = 0; i < ntoks; i++ ) {
606 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
607 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
608 uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
613 cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
620 if( DEBUG || (vlevel > 2) ) {
621 rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
627 Trash_entry takes a partially parsed record from the input and
628 will delete the entry if the sender,mtype matches us or it's a
629 generic mtype. The refernce in the new table is removed and the
630 refcounter for the actual rte is decreased. If that ref count is
631 0 then the memory is freed (handled byh the del_rte call).
633 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
634 rtable_ent_t* rte; // route table entry to be 'deleted'
637 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
640 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
644 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
646 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
647 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
648 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
650 key = build_rt_key( subid, atoi( ts_field ) );
651 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
653 if( DEBUG || (vlevel > 1) ) {
654 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
656 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
657 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
659 if( DEBUG || (vlevel > 1) ) {
660 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
664 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
668 // -------------------------- parse functions --------------------------------------------------
671 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
672 the 'owner' which should be the dns name or IP address of an enpoint
673 the meid_list is a space separated list of me IDs
675 This function assumes the caller has vetted the pointers as needed.
677 For each meid in the list, an entry is pushed into the hash which references the owner
678 endpoint such that when the meid is used to route a message it references the endpoint
681 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
687 endpoint_t* ep; // endpoint struct for the owner
689 owner = clip( owner ); // ditch extra whitespace and trailing comments
690 meid_list = clip( meid_list );
692 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
693 for( i = 0; i < ntoks; i++ ) {
694 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
695 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
696 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
698 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
704 Given the tokens from an mme_del, delete the listed meid entries from the new
705 table. The list is a space separated list of meids.
707 The meids in the hash reference endpoints which are never deleted and so
708 the only thing that we need to do here is to remove the meid from the hash.
710 This function assumes the caller has vetted the pointers as needed.
712 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
718 if( rtab->hash == NULL ) {
722 meid_list = clip( meid_list );
724 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
725 for( i = 0; i < ntoks; i++ ) {
726 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
727 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
732 Parse a partially parsed meid record. Tokens[0] should be one of:
733 meid_map, mme_ar, mme_del.
735 pctx is the private context needed to return an ack/nack using the provided
736 message buffer with the route managers address info.
738 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
741 if( tokens == NULL || ntoks < 1 ) {
742 return; // silent but should never happen
745 if( ntoks < 2 ) { // must have at least two for any valid request record
746 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
750 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
751 tokens[1] = clip( tokens[1] );
752 if( *(tokens[1]) == 's' ) {
753 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
754 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
755 uta_rt_drop( ctx->new_rtable );
756 ctx->new_rtable = NULL;
757 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
760 if( ctx->table_id != NULL ) {
761 free( ctx->table_id );
764 ctx->table_id = strdup( clip( tokens[2] ) );
766 ctx->table_id = NULL;
769 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
770 ctx->new_rtable->mupdates = 0;
772 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
774 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
775 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
776 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
777 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
778 ctx->new_rtable->mupdates, tokens[2] );
779 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
780 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
781 uta_rt_drop( ctx->new_rtable );
782 ctx->new_rtable = NULL;
786 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
789 if( ctx->new_rtable ) {
790 roll_tables( ctx ); // roll active to old, and new to active with proper locking
791 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
792 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
795 if( ctx->old_rtable != NULL ) {
796 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
797 rt_stats( ctx->old_rtable );
799 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
801 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
802 rt_stats( ctx->rtable );
805 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
806 ctx->new_rtable = NULL;
814 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
815 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
819 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
820 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
821 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
824 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
825 ctx->new_rtable->mupdates++;
829 if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
830 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
831 ctx->new_rtable->mupdates++;
837 This will close the current table snarf file (in *.inc) and open a new one.
838 The curent one is renamed. The final file name is determined by the setting of
839 RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
840 an additional extension of .snarf. If neither seed or snarf environment vars are
841 set then this does nothing.
843 If this is called before the tmp snarf file is opened, then this just opens the file.
845 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
846 static int ok2warn = 0; // some warnings squelched on first call
848 char* seed_fname; // the filename from env
849 char tfname[512]; // temp fname
850 char wfname[512]; // working buffer for filename
851 char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given
857 if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given
858 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file
859 memset( wfname, 0, sizeof( wfname ) );
860 snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
861 snarf_fname = wfname;
865 if( snarf_fname == NULL ) {
866 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
870 memset( tfname, 0, sizeof( tfname ) );
871 snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable
873 if( ctx->snarf_rt_fd >= 0 ) {
874 char* msg= "### captured from route manager\n";
875 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
876 if( close( ctx->snarf_rt_fd ) < 0 ) {
877 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
881 if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it
882 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
885 if( rename( tfname, snarf_fname ) ) {
886 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
888 rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
893 ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
894 if( ctx->snarf_rt_fd < 0 ) {
895 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
897 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
902 Parse a single record recevied from the route table generator, or read
903 from a static route table file. Start records cause a new table to
904 be started (if a partial table was received it is discarded. Table
905 entry records are added to the currenly 'in progress' table, and an
906 end record causes the in progress table to be finalised and the
907 currently active table is replaced.
909 The updated table will be activated when the *|end record is encountered.
910 However, to allow for a "double" update, where both the meid map and the
911 route table must be updated at the same time, the end indication on a
912 route table (new or update) may specifiy "hold" which indicates that meid
913 map entries are to follow and the updated route table should be held as
914 pending until the end of the meid map is received and validated.
916 CAUTION: we are assuming that there is a single route/meid map generator
917 and as such only one type of update is received at a time; in other
918 words, the sender cannot mix update records and if there is more than
919 one sender process they must synchronise to avoid issues.
922 For a RT update, we expect:
923 newrt | start | <table-id>
924 newrt | end | <count>
925 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
926 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
927 mse| <mtype>[,sender] | <sub-id> | %meid
930 For a meid map update we expect:
931 meid_map | start | <table-id>
932 meid_map | end | <count> | <md5-hash>
933 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
934 mme_del | <meid0> <meid1>...<meidn>
937 The pctx is our private context that must be used to send acks/status
938 messages back to the route manager. The regular ctx is the ctx that
939 the user has been given and thus that's where we have to hang the route
940 table we're working with.
942 If mbuf is given, and we need to ack, then we ack using the mbuf and a
943 return to sender call (allows route manager to use wh_call() to send
944 an update and rts is required to get that back to the right thread).
945 If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
948 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
950 int ntoks; // number of tokens found in something
952 int grp; // group number
953 rtable_ent_t const* rte; // route table entry added
955 char* tok; // pointer into a token or string
962 if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy
963 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
964 write( ctx->snarf_rt_fd, "\n", 1 );
967 while( *buf && isspace( *buf ) ) { // skip leading whitespace
970 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
973 memset( tokens, 0, sizeof( tokens ) );
974 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
975 tokens[0] = clip( tokens[0] );
976 switch( *(tokens[0]) ) {
977 case 0: // ignore blanks
979 case '#': // and comment lines
982 case 'd': // del | [sender,]mtype | sub-id
983 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
988 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
992 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
993 ctx->new_rtable->updates++;
996 case 'n': // newrt|{start|end}
997 tokens[1] = clip( tokens[1] );
998 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
999 if( ctx && ctx->snarf_rt_fd >= 0 ) {
1000 cycle_snarfed_rt( ctx ); // make it available and open a new one
1004 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
1005 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1006 ctx->new_rtable->updates, tokens[2] );
1007 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1008 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1009 uta_rt_drop( ctx->new_rtable );
1010 ctx->new_rtable = NULL;
1015 if( ctx->new_rtable ) {
1016 roll_tables( ctx ); // roll active to old, and new to active with proper locking
1017 if( DEBUG > 1 || (vlevel > 1) ) {
1018 rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1022 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1023 ctx->rtable_ready = 1; // route based sends can now happen
1024 ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table
1026 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1027 ctx->new_rtable = NULL;
1029 } else { // start a new table.
1030 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
1031 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
1033 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1034 uta_rt_drop( ctx->new_rtable );
1035 ctx->new_rtable = NULL;
1038 if( ctx->table_id != NULL ) {
1039 free( ctx->table_id );
1042 ctx->table_id = strdup( clip( tokens[2] ) );
1044 ctx->table_id = NULL;
1047 ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
1048 ctx->new_rtable->updates = 0; // init count of entries received
1050 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1054 case 'm': // mse entry or one of the meid_ records
1055 if( strcmp( tokens[0], "mse" ) == 0 ) {
1056 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
1061 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1065 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1066 ctx->new_rtable->updates++;
1068 meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1072 case 'r': // assume rt entry
1073 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
1077 ctx->new_rtable->updates++;
1078 if( ntoks > 3 ) { // assume new entry with subid last
1079 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1081 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
1085 case 'u': // update current table, not a total replacement
1086 if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator
1087 rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1091 tokens[1] = clip( tokens[1] );
1092 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
1093 if( ctx->new_rtable == NULL ) { // update table not in progress
1096 if( ctx->snarf_rt_fd >= 0 ) {
1097 cycle_snarfed_rt( ctx ); // make it available and open a new one
1101 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
1102 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1103 ctx->new_rtable->updates, tokens[2] );
1104 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1105 uta_rt_drop( ctx->new_rtable );
1106 ctx->new_rtable = NULL;
1111 if( ctx->new_rtable ) {
1112 roll_tables( ctx ); // roll active to old, and new to active with proper locking
1113 if( DEBUG > 1 || (vlevel > 1) ) {
1114 rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1118 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1119 ctx->rtable_ready = 1; // route based sends can now happen
1121 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1122 ctx->new_rtable = NULL;
1124 } else { // start a new table.
1125 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
1126 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1127 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
1128 uta_rt_drop( ctx->new_rtable );
1129 ctx->new_rtable = NULL;
1133 if( ctx->table_id != NULL ) {
1134 free( ctx->table_id );
1136 ctx->table_id = strdup( clip( tokens[2] ) );
1139 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
1140 ctx->new_rtable->updates = 0; // init count of updates received
1142 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1147 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1154 This function attempts to open a static route table in order to create a 'seed'
1155 table during initialisation. The environment variable RMR_SEED_RT is expected
1156 to contain the necessary path to the file. If missing, or if the file is empty,
1157 no route table will be available until one is received from the generator.
1159 This function is probably most useful for testing situations, or extreme
1160 cases where the routes are static.
1162 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1165 char* fbuf; // buffer with file contents
1166 char* rec; // start of the record
1167 char* eor; // end of the record
1168 int rcount = 0; // record count for debug
1170 if( (fname = ctx->seed_rt_fname) == NULL ) {
1171 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1175 ctx->seed_rt_fname = strdup( fname );
1176 fname = ctx->seed_rt_fname;
1179 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
1180 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1184 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1185 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
1186 if( *eor == '\r' ) {
1187 *eor = '\n'; // will look like a blank line which is ok
1192 while( rec && *rec ) {
1194 if( (eor = strchr( rec, '\n' )) != NULL ) {
1197 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1198 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1203 parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
1208 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
1213 Callback driven for each thing in a symtab. We collect the pointers to those
1214 things for later use (cloning).
1216 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1219 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1223 if( thing == NULL ) {
1224 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1228 tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1229 tl->things[tl->nused++] = thing; // save a reference to the thing
1231 if( tl->nused >= tl->nalloc ) {
1232 extend_things( tl ); // not enough allocated
1237 Called to delete a route table entry struct. We delete the array of endpoint
1238 pointers, but NOT the endpoints referenced as those are referenced from
1241 Route table entries can be concurrently referenced by multiple symtabs, so
1242 the actual delete happens only if decrementing the rte's ref count takes it
1243 to 0. Thus, it is safe to call this function across a symtab when cleaning up
1244 the symtab, or overlaying an entry.
1246 This function uses ONLY the pointer to the rte (thing) and ignores the other
1247 information that symtab foreach function passes (st, entry, and data) which
1248 means that it _can_ safetly be used outside of the foreach setting. If
1249 the function is changed to depend on any of these three, then a stand-alone
1250 rte_cleanup() function should be added and referenced by this, and refererences
1251 to this outside of the foreach world should be changed.
1253 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1257 if( (rte = (rtable_ent_t *) thing) == NULL ) {
1258 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1263 if( rte->refs > 0 ) { // something still referencing, so it lives
1267 if( rte->rrgroups ) { // clean up the round robin groups
1268 for( i = 0; i < rte->nrrgroups; i++ ) {
1269 if( rte->rrgroups[i] ) {
1270 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
1271 free( rte->rrgroups[i] ); // but must free the rrg itself too
1276 free( rte->rrgroups );
1279 free( rte ); // finally, drop the potato
1283 Read an entire file into a buffer. We assume for route table files
1284 they will be smallish and so this won't be a problem.
1285 Returns a pointer to the buffer, or nil. Caller must free.
1286 Terminates the buffer with a nil character for string processing.
1288 If we cannot stat the file, we assume it's empty or missing and return
1289 an empty buffer, as opposed to a nil, so the caller can generate defaults
1290 or error if an empty/missing file isn't tolerated.
1292 static char* uta_fib( char const* fname ) {
1294 off_t fsize = 8192; // size of the file
1295 off_t nread; // number of bytes read
1297 char* buf; // input buffer
1299 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1300 if( fstat( fd, &stats ) >= 0 ) {
1301 if( stats.st_size <= 0 ) { // empty file
1305 fsize = stats.st_size; // stat ok, save the file size
1308 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1312 if( fd < 0 ) { // didn't open or empty
1313 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1321 // add a size limit check here
1323 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1329 nread = read( fd, buf, fsize );
1330 if( nread < 0 || nread > fsize ) { // failure of some kind
1332 errno = EFBIG; // likely too much to handle
1343 // --------------------- initialisation/creation ---------------------------------------------
1345 Create and initialise a route table; Returns a pointer to the table struct.
1347 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1353 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1357 memset( rt, 0, sizeof( *rt ) );
1359 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1364 rt->gate = ctx->rtgate; // single mutex needed for all route tables
1365 rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
1366 pthread_mutex_init( rt->gate, NULL );
1372 Clones one of the spaces in the given table.
1373 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1374 Space is the space in the old table to copy. Space 0 uses an integer key and
1375 references rte structs. All other spaces use a string key and reference endpoints.
1377 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1378 endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
1379 rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
1380 void* sst; // source symtab
1381 void* nst; // new symtab
1382 thing_list_t things; // things from the space to copy
1384 int free_on_err = 0;
1389 if( nrt == NULL ) { // make a new table if needed
1391 nrt = uta_rt_init( ctx );
1397 if( srt == NULL ) { // source was nil, just give back the new table
1401 things.nalloc = 2048;
1404 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1405 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1406 if( things.things == NULL || things.names == NULL ){
1407 if( things.things != NULL ) {
1408 free( things.things );
1410 if( things.names != NULL ) {
1411 free( things.names );
1415 rmr_sym_free( nrt->hash );
1424 memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1425 memset( things.names, 0, sizeof( char * ) * things.nalloc );
1427 sst = srt->hash; // convenience pointers (src symtab)
1430 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1431 if( things.error ) { // something happened and capture failed
1432 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1433 free( things.things );
1434 free( things.names );
1436 rmr_sym_free( nrt->hash );
1445 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1446 for( i = 0; i < things.nused; i++ ) {
1447 if( space ) { // string key, epoint reference
1448 ep = (endpoint_t *) things.things[i];
1449 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1451 rte = (rtable_ent_t *) things.things[i];
1452 rte->refs++; // rtes can be removed, so we track references
1453 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1457 free( things.things );
1458 free( (void *) things.names );
1463 Given a destination route table (drt), clone from the source (srt) into it.
1464 If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1465 allocate the drt if that was nil too). If all is true (1), then we will clone both
1466 the MT and the ME spaces; otherwise only the ME space is cloned.
1468 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1469 endpoint_t* ep; // an endpoint
1470 rtable_ent_t* rte; // a route table entry
1477 drt = uta_rt_init( ctx );
1483 drt->ephash = ctx->ephash; // all rts reference the same EP symtab
1484 rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1486 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1493 Prepares the "new" route table for populating. If the old_rtable is not nil, then
1494 we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1495 context to be referenced by the new pointer; the old pointer is set to nil.
1497 If the old table doesn't exist, then a new table is created and the new pointer is
1498 set to reference it.
1500 The ME namespace references endpoints which do not need to be released, therefore we
1501 do not need to run that portion of the table to deref like we do for the RTEs.
1503 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1512 if( (rt = ctx->old_rtable) != NULL ) {
1513 ctx->old_rtable = NULL;
1515 pthread_mutex_lock( ctx->rtgate );
1516 ref_count = rt->ref_count;
1517 pthread_mutex_unlock( ctx->rtgate );
1519 while( ref_count > 0 ) { // wait for all who are using to stop
1520 if( counter++ > 1000 ) {
1521 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
1525 usleep( 1000 ); // small sleep to yield the processer if that is needed
1527 pthread_mutex_lock( ctx->rtgate );
1528 ref_count = rt->ref_count;
1529 pthread_mutex_unlock( ctx->rtgate );
1532 if( rt->hash != NULL ) {
1533 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
1534 rmr_sym_clear( rt->hash ); // clear all entries from the old table
1537 rt->error = 0; // table with errors can be here, so endure clear before attempt to load
1542 rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
1543 if( rt != NULL ) { // very small chance for nil, but not zero, so test
1544 rt->ref_count = 0; // take no chances; ensure it's 0!
1546 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1547 rt = uta_rt_init( ctx ); // must hav something, but mark it in error state
1556 Given a name, find the endpoint struct in the provided route table.
1558 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1560 if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1564 return rmr_sym_get( rt->ephash, ep_name, 1 );
1568 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1569 Does NOT destroy the gate as it's a common gate for ALL route tables.
1571 static void uta_rt_drop( route_table_t* rt ) {
1576 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1577 rmr_sym_free( rt->hash ); // free all of the hash related data
1582 Look up and return the pointer to the endpoint stuct matching the given name.
1583 If not in the hash, a new endpoint is created, added to the hash. Should always
1586 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1589 if( !rt || !ep_name || ! *ep_name ) {
1590 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1595 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1596 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1597 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1602 ep->notify = 1; // show notification on first connection failure
1603 ep->open = 0; // not connected
1604 ep->addr = uta_h2ip( ep_name );
1605 ep->name = strdup( ep_name );
1606 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1607 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1609 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1617 Given a session id and message type build a key that can be used to look up the rte in the route
1618 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1620 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1623 if( sub_id == UNSET_SUBID ) {
1624 key = 0xffffffff00000000 | mtype;
1626 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1633 Given a route table and meid string, find the owner (if known). Returns a pointer to
1634 the endpoint struct or nil.
1636 static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
1637 endpoint_t const* ep; // the ep we found in the hash
1639 if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1643 return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1647 This returns a pointer to the currently active route table and ups
1648 the reference count so that the route table is not freed while it
1649 is being used. The caller MUST call release_rt() when finished
1652 Care must be taken: the ctx->rtable pointer _could_ change during the time
1653 between the release of the lock and the return. Therefore we MUST grab
1654 the current pointer when we have the lock so that if it does we don't
1655 return a pointer to the wrong table.
1657 This will return NULL if there is no active table.
1659 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1660 route_table_t* rrt; // return value
1662 if( ctx == NULL || ctx->rtable == NULL ) {
1666 pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
1667 rrt = ctx->rtable; // must stash the pointer while we hold lock
1669 pthread_mutex_unlock( ctx->rtgate );
1671 return rrt; // pointer we upped the count with
1675 This will "release" the route table by reducing the use counter
1676 in the table. The table may not be freed until the counter reaches
1677 0, so it's imparative that the pointer be "released" when it is
1678 fetched by get_rt(). Once the caller has released the table it
1679 may not safely use the pointer that it had.
1681 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1682 if( ctx == NULL || rt == NULL ) {
1686 pthread_mutex_lock( ctx->rtgate ); // must hold lock
1687 if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
1690 pthread_mutex_unlock( ctx->rtgate );