3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
49 #include <RIC_message_types.h> // needed for route manager messages
53 Passed to a symtab foreach callback to construct a list of pointers from
56 typedef struct thing_list {
63 // ---- debugging/testing -------------------------------------------------------------------------
66 Dump some stats for an endpoint in the RT. This is generally called to
67 verify endpoints after a table load/change.
69 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
73 if( (ep = (endpoint_t *) thing) == NULL ) {
77 if( (counter = (int *) vcounter) != NULL ) {
81 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
85 Called to count meid entries in the table. The meid points to an 'owning' endpoint
86 so we can list what we find
88 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
92 if( (ep = (endpoint_t *) thing) == NULL ) {
96 if( (counter = (int *) vcounter) != NULL ) {
100 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
104 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
105 the 'source' information and is added to each message.
107 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
111 if( (ep = (endpoint_t *) thing) == NULL ) {
115 if( (id = (char *) vid) == NULL ) {
119 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
120 (long long) time( NULL ),
124 ep->scounts[EPSC_GOOD],
125 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
126 ep->scounts[EPSC_FAIL],
127 ep->scounts[EPSC_TRANS] );
131 Dump stats for a route entry in the table.
133 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
135 rtable_ent_t* rte; // thing is really an rte
139 if( (rte = (rtable_ent_t *) thing) == NULL ) {
143 if( (counter = (int *) vcounter) != NULL ) {
147 mtype = rte->key & 0xffff;
148 sid = (int) (rte->key >> 32);
150 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
154 Given a route table, cause some stats to be spit out.
156 static void rt_stats( route_table_t* rt ) {
160 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
164 counter = (int *) malloc( sizeof( int ) );
166 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
167 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
168 rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
169 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
171 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
173 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
174 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
176 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
178 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
179 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
185 Given a route table, cause endpoint counters to be written to stderr. The id
186 parm is written as the "source" in the output.
188 static void rt_epcounts( route_table_t* rt, char* id ) {
190 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
194 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
197 // ------------ route manager communication -------------------------------------------------
199 Send a request for a table update to the route manager. Updates come in
200 async, so send and go.
202 pctx is the private context for the thread; ctx is the application context
203 that we need to be able to send the application ID in case rt mgr needs to
204 use it to idenfity us.
206 Returns 0 if we were not able to send a request.
208 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
212 if( ctx->rtg_whid < 0 ) {
216 smsg = rmr_alloc_msg( pctx, 1024 );
218 smsg->mtype = RMRRM_REQ_TABLE;
220 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
221 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
222 smsg->len = strlen( smsg->payload ) + 1;
224 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
225 if( (state = smsg->state) != RMR_OK ) {
226 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
227 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
231 rmr_free_msg( smsg );
238 Send an ack to the route table manager for a table ID that we are
239 processing. State is 1 for OK, and 0 for failed. Reason might
240 be populated if we know why there was a failure.
242 Context should be the PRIVATE context that we use for messages
243 to route manger and NOT the user's context.
245 If a message buffere is passed we use that and use return to sender
246 assuming that this might be a response to a call and that is needed
247 to send back to the proper calling thread. If msg is nil, we allocate
250 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
252 int payload_size = 1024;
254 if( ctx == NULL || ctx->rtg_whid < 0 ) {
258 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
263 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
266 smsg = rmr_alloc_msg( ctx, payload_size );
270 smsg->mtype = RMRRM_TABLE_STATE;
272 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
273 table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
275 smsg->len = strlen( smsg->payload ) + 1;
277 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
279 smsg = rmr_rts_msg( ctx, smsg );
281 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
283 if( (state = smsg->state) != RMR_OK ) {
284 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
285 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
290 rmr_free_msg( smsg ); // if not our message we must free the leftovers
295 // ------------------------------------------------------------------------------------------------
297 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
298 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
300 static char* clip( char* buf ) {
303 while( *buf && isspace( *buf ) ) { // skip leading whitespace
307 if( (tok = strchr( buf, '#' )) != NULL ) {
309 return buf; // just push back; leading comment sym handled there
312 if( isspace( *(tok-1) ) ) {
317 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
324 This accepts a pointer to a nil terminated string, and ensures that there is a
325 newline as the last character. If there is not, a new buffer is allocated and
326 the newline is added. If a new buffer is allocated, the buffer passed in is
327 freed. The function returns a pointer which the caller should use, and must
328 free. In the event of an error, a nil pointer is returned.
330 static char* ensure_nlterm( char* buf ) {
335 if( buf == NULL || (len = strlen( buf )) < 2 ) {
336 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
341 if( buf[len-1] != '\n' ) {
342 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
343 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
344 memcpy( nb, buf, len );
345 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
357 Given a message type create a route table entry and add to the hash keyed on the
358 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
359 is the number of group slots to allocate in the entry.
361 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
363 rtable_ent_t* old_rte; // entry which was already in the table for the key
369 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
370 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
373 memset( rte, 0, sizeof( *rte ) );
377 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
382 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
386 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
388 rte->rrgroups = NULL;
391 rte->nrrgroups = nrrgroups;
393 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
394 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
397 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
399 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
404 This accepts partially parsed information from an rte or mse record sent by route manager or read from
406 ts_field is the msg-type,sender field
407 subid is the integer subscription id
408 rr_field is the endpoint information for round robening message over
410 If all goes well, this will add an RTE to the table under construction.
412 The ts_field is checked to see if we should ingest this record. We ingest if one of
414 there is no sender info (a generic entry for all)
415 there is sender and our host:port matches one of the senders
416 the sender info is an IP address that matches one of our IP addresses
418 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
419 rtable_ent_t* rte; // route table entry added
422 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
426 int ngtoks; // number of tokens in the group list
427 int grp; // index into group list
429 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
430 rr_field = clip( rr_field );
432 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
433 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
434 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
436 key = build_rt_key( subid, atoi( ts_field ) );
438 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
440 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
441 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
442 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
444 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
445 rte->mtype = atoi( ts_field ); // capture mtype for debugging
447 for( grp = 0; grp < ngtoks; grp++ ) {
448 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
449 for( i = 0; i < ntoks; i++ ) {
450 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
451 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
452 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
459 if( DEBUG || (vlevel > 2) ) {
460 rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
466 Trash_entry takes a partially parsed record from the input and
467 will delete the entry if the sender,mtype matches us or it's a
468 generic mtype. The refernce in the new table is removed and the
469 refcounter for the actual rte is decreased. If that ref count is
470 0 then the memory is freed (handled byh the del_rte call).
472 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
473 rtable_ent_t* rte; // route table entry to be 'deleted'
476 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
479 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
483 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
485 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
486 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
487 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
489 key = build_rt_key( subid, atoi( ts_field ) );
490 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
492 if( DEBUG || (vlevel > 1) ) {
493 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
495 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
496 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
498 if( DEBUG || (vlevel > 1) ) {
499 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
503 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
508 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
509 the 'owner' which should be the dns name or IP address of an enpoint
510 the meid_list is a space separated list of me IDs
512 This function assumes the caller has vetted the pointers as needed.
514 For each meid in the list, an entry is pushed into the hash which references the owner
515 endpoint such that when the meid is used to route a message it references the endpoint
518 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
524 endpoint_t* ep; // endpoint struct for the owner
526 owner = clip( owner ); // ditch extra whitespace and trailing comments
527 meid_list = clip( meid_list );
529 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
530 for( i = 0; i < ntoks; i++ ) {
531 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
532 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
533 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
535 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
541 Given the tokens from an mme_del, delete the listed meid entries from the new
542 table. The list is a space separated list of meids.
544 The meids in the hash reference endpoints which are never deleted and so
545 the only thing that we need to do here is to remove the meid from the hash.
547 This function assumes the caller has vetted the pointers as needed.
549 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
555 if( rtab->hash == NULL ) {
559 meid_list = clip( meid_list );
561 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
562 for( i = 0; i < ntoks; i++ ) {
563 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
564 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
569 Parse a partially parsed meid record. Tokens[0] should be one of:
570 meid_map, mme_ar, mme_del.
572 pctx is the private context needed to return an ack/nack using the provided
573 message buffer with the route managers address info.
575 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
578 if( tokens == NULL || ntoks < 1 ) {
579 return; // silent but should never happen
582 if( ntoks < 2 ) { // must have at least two for any valid request record
583 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
587 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
588 tokens[1] = clip( tokens[1] );
589 if( *(tokens[1]) == 's' ) {
590 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
591 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
592 uta_rt_drop( ctx->new_rtable );
593 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
596 if( ctx->table_id != NULL ) {
597 free( ctx->table_id );
600 ctx->table_id = strdup( clip( tokens[2] ) );
602 ctx->table_id = NULL;
604 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
605 ctx->new_rtable->mupdates = 0;
606 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
608 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
609 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
610 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
611 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
612 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
613 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
614 uta_rt_drop( ctx->new_rtable );
615 ctx->new_rtable = NULL;
619 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
622 if( ctx->new_rtable ) {
623 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
624 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
625 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
626 ctx->new_rtable = NULL;
627 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
628 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
631 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
632 rt_stats( ctx->old_rtable );
633 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
634 rt_stats( ctx->rtable );
637 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
638 ctx->new_rtable = NULL;
646 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
647 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
651 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
652 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
653 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
656 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
657 ctx->new_rtable->mupdates++;
660 if( strcmp( tokens[0], "mme_del" ) == 0 ) {
662 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
665 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
666 ctx->new_rtable->mupdates++;
671 Parse a single record recevied from the route table generator, or read
672 from a static route table file. Start records cause a new table to
673 be started (if a partial table was received it is discarded. Table
674 entry records are added to the currenly 'in progress' table, and an
675 end record causes the in progress table to be finalised and the
676 currently active table is replaced.
678 The updated table will be activated when the *|end record is encountered.
679 However, to allow for a "double" update, where both the meid map and the
680 route table must be updated at the same time, the end indication on a
681 route table (new or update) may specifiy "hold" which indicates that meid
682 map entries are to follow and the updated route table should be held as
683 pending until the end of the meid map is received and validated.
685 CAUTION: we are assuming that there is a single route/meid map generator
686 and as such only one type of update is received at a time; in other
687 words, the sender cannot mix update records and if there is more than
688 one sender process they must synchronise to avoid issues.
691 For a RT update, we expect:
692 newrt | start | <table-id>
693 newrt | end | <count>
694 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
695 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
696 mse| <mtype>[,sender] | <sub-id> | %meid
699 For a meid map update we expect:
700 meid_map | start | <table-id>
701 meid_map | end | <count> | <md5-hash>
702 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
703 mme_del | <meid0> <meid1>...<meidn>
706 The pctx is our private context that must be used to send acks/status
707 messages back to the route manager. The regular ctx is the ctx that
708 the user has been given and thus that's where we have to hang the route
709 table we're working with.
711 If mbuf is given, and we need to ack, then we ack using the mbuf and a
712 return to sender call (allows route manager to use wh_call() to send
713 an update and rts is required to get that back to the right thread).
714 If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
717 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
719 int ntoks; // number of tokens found in something
721 int grp; // group number
722 rtable_ent_t* rte; // route table entry added
724 char* tok; // pointer into a token or string
731 while( *buf && isspace( *buf ) ) { // skip leading whitespace
734 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
737 memset( tokens, 0, sizeof( tokens ) );
738 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
739 tokens[0] = clip( tokens[0] );
740 switch( *(tokens[0]) ) {
741 case 0: // ignore blanks
743 case '#': // and comment lines
746 case 'd': // del | [sender,]mtype | sub-id
747 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
752 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
756 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
757 ctx->new_rtable->updates++;
760 case 'n': // newrt|{start|end}
761 tokens[1] = clip( tokens[1] );
762 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
764 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
765 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
766 ctx->new_rtable->updates, tokens[2] );
767 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
768 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
769 uta_rt_drop( ctx->new_rtable );
770 ctx->new_rtable = NULL;
775 if( ctx->new_rtable ) {
776 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
777 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
778 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
779 ctx->new_rtable = NULL;
780 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
783 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
784 rt_stats( ctx->old_rtable );
785 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
786 rt_stats( ctx->rtable );
789 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
790 ctx->rtable_ready = 1; // route based sends can now happen
792 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
793 ctx->new_rtable = NULL;
795 } else { // start a new table.
796 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
797 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
799 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
800 uta_rt_drop( ctx->new_rtable );
803 if( ctx->table_id != NULL ) {
804 free( ctx->table_id );
807 ctx->table_id = strdup( clip( tokens[2] ) );
809 ctx->table_id = NULL;
812 ctx->new_rtable = NULL;
813 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
814 ctx->new_rtable->updates = 0; // init count of entries received
815 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
819 case 'm': // mse entry or one of the meid_ records
820 if( strcmp( tokens[0], "mse" ) == 0 ) {
821 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
826 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
830 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
831 ctx->new_rtable->updates++;
833 meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
837 case 'r': // assume rt entry
838 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
842 ctx->new_rtable->updates++;
843 if( ntoks > 3 ) { // assume new entry with subid last
844 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
846 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
850 case 'u': // update current table, not a total replacement
851 tokens[1] = clip( tokens[1] );
852 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
853 if( ctx->new_rtable == NULL ) { // update table not in progress
858 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
859 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
860 ctx->new_rtable->updates, tokens[2] );
861 uta_rt_drop( ctx->new_rtable );
862 ctx->new_rtable = NULL;
867 if( ctx->new_rtable ) {
868 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
869 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
870 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
871 ctx->new_rtable = NULL;
872 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
875 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
876 rt_stats( ctx->old_rtable );
877 rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
878 rt_stats( ctx->rtable );
881 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
882 ctx->new_rtable = NULL;
884 } else { // start a new table.
885 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
886 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
887 uta_rt_drop( ctx->new_rtable );
891 if( ctx->table_id != NULL ) {
892 free( ctx->table_id );
894 ctx->table_id = strdup( clip( tokens[2] ) );
897 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
898 ctx->new_rtable->updates = 0; // init count of updates received
899 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
904 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
911 This function attempts to open a static route table in order to create a 'seed'
912 table during initialisation. The environment variable RMR_SEED_RT is expected
913 to contain the necessary path to the file. If missing, or if the file is empty,
914 no route table will be available until one is received from the generator.
916 This function is probably most useful for testing situations, or extreme
917 cases where the routes are static.
919 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
922 char* fbuf; // buffer with file contents
923 char* rec; // start of the record
924 char* eor; // end of the record
925 int rcount = 0; // record count for debug
927 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
931 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
932 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
936 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
937 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
939 *eor = '\n'; // will look like a blank line which is ok
944 while( rec && *rec ) {
946 if( (eor = strchr( rec, '\n' )) != NULL ) {
949 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
950 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
955 parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
960 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
965 Callback driven for each named thing in a symtab. We collect the pointers to those
966 things for later use (cloning).
968 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
971 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
975 if( thing == NULL ) {
979 tl->names[tl->nused] = name; // the name/key
980 tl->things[tl->nused++] = thing; // save a reference to the thing
984 Called to delete a route table entry struct. We delete the array of endpoint
985 pointers, but NOT the endpoints referenced as those are referenced from
988 Route table entries can be concurrently referenced by multiple symtabs, so
989 the actual delete happens only if decrementing the rte's ref count takes it
990 to 0. Thus, it is safe to call this function across a symtab when cleaning up
991 the symtab, or overlaying an entry.
993 This function uses ONLY the pointer to the rte (thing) and ignores the other
994 information that symtab foreach function passes (st, entry, and data) which
995 means that it _can_ safetly be used outside of the foreach setting. If
996 the function is changed to depend on any of these three, then a stand-alone
997 rte_cleanup() function should be added and referenced by this, and refererences
998 to this outside of the foreach world should be changed.
1000 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1004 if( (rte = (rtable_ent_t *) thing) == NULL ) {
1009 if( rte->refs > 0 ) { // something still referencing, so it lives
1013 if( rte->rrgroups ) { // clean up the round robin groups
1014 for( i = 0; i < rte->nrrgroups; i++ ) {
1015 if( rte->rrgroups[i] ) {
1016 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
1020 free( rte->rrgroups );
1023 free( rte ); // finally, drop the potato
1027 Read an entire file into a buffer. We assume for route table files
1028 they will be smallish and so this won't be a problem.
1029 Returns a pointer to the buffer, or nil. Caller must free.
1030 Terminates the buffer with a nil character for string processing.
1032 If we cannot stat the file, we assume it's empty or missing and return
1033 an empty buffer, as opposed to a nil, so the caller can generate defaults
1034 or error if an empty/missing file isn't tolerated.
1036 static char* uta_fib( char* fname ) {
1038 off_t fsize = 8192; // size of the file
1039 off_t nread; // number of bytes read
1041 char* buf; // input buffer
1043 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1044 if( fstat( fd, &stats ) >= 0 ) {
1045 if( stats.st_size <= 0 ) { // empty file
1049 fsize = stats.st_size; // stat ok, save the file size
1052 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1056 if( fd < 0 ) { // didn't open or empty
1057 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1065 // add a size limit check here
1067 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1073 nread = read( fd, buf, fsize );
1074 if( nread < 0 || nread > fsize ) { // failure of some kind
1076 errno = EFBIG; // likely too much to handle
1088 Create and initialise a route table; Returns a pointer to the table struct.
1090 static route_table_t* uta_rt_init( ) {
1093 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1096 memset( rt, 0, sizeof( *rt ) );
1098 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1107 Clones one of the spaces in the given table.
1108 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1109 Space is the space in the old table to copy. Space 0 uses an integer key and
1110 references rte structs. All other spaces use a string key and reference endpoints.
1112 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1113 endpoint_t* ep; // an endpoint
1114 rtable_ent_t* rte; // a route table entry
1115 void* sst; // source symtab
1116 void* nst; // new symtab
1117 thing_list_t things; // things from the space to copy
1119 int free_on_err = 0;
1121 if( nrt == NULL ) { // make a new table if needed
1123 nrt = uta_rt_init();
1126 if( srt == NULL ) { // source was nil, just give back the new table
1130 things.nalloc = 2048;
1132 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1133 memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1134 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1135 memset( things.names, 0, sizeof( char * ) * things.nalloc );
1136 if( things.things == NULL ) {
1146 sst = srt->hash; // convenience pointers (src symtab)
1149 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1151 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1152 for( i = 0; i < things.nused; i++ ) {
1153 if( space ) { // string key, epoint reference
1154 ep = (endpoint_t *) things.things[i];
1155 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1157 rte = (rtable_ent_t *) things.things[i];
1158 rte->refs++; // rtes can be removed, so we track references
1159 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1163 free( things.things );
1164 free( (void *) things.names );
1169 Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1170 The endpoint and meid entries in the hash must be preserved.
1172 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1173 endpoint_t* ep; // an endpoint
1174 rtable_ent_t* rte; // a route table entry
1175 route_table_t* nrt = NULL; // new route table
1179 return uta_rt_init(); // no source to clone, just return an empty table
1182 nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
1183 rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
1189 Creates a new route table and then clones _all_ of the given route table (references
1190 both endpoints AND the route table entries. Needed to support a partial update where
1191 some route table entries will not be deleted if not explicitly in the update and when
1192 we are adding/replacing meid references.
1194 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1195 endpoint_t* ep; // an endpoint
1196 rtable_ent_t* rte; // a route table entry
1197 route_table_t* nrt = NULL; // new route table
1201 return uta_rt_init(); // no source to clone, just return an empty table
1204 nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it
1205 rt_clone_space( srt, nrt, RT_NAME_SPACE );
1206 rt_clone_space( srt, nrt, RT_ME_SPACE );
1212 Given a name, find the endpoint struct in the provided route table.
1214 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1216 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1220 return rmr_sym_get( rt->hash, ep_name, 1 );
1224 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1226 static void uta_rt_drop( route_table_t* rt ) {
1231 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1232 rmr_sym_free( rt->hash ); // free all of the hash related data
1237 Look up and return the pointer to the endpoint stuct matching the given name.
1238 If not in the hash, a new endpoint is created, added to the hash. Should always
1241 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1244 if( !rt || !ep_name || ! *ep_name ) {
1245 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1250 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1251 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1252 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1257 ep->notify = 1; // show notification on first connection failure
1258 ep->open = 0; // not connected
1259 ep->addr = uta_h2ip( ep_name );
1260 ep->name = strdup( ep_name );
1261 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1262 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1264 rmr_sym_put( rt->hash, ep_name, 1, ep );
1272 Given a session id and message type build a key that can be used to look up the rte in the route
1273 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1275 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1278 if( sub_id == UNSET_SUBID ) {
1279 key = 0xffffffff00000000 | mtype;
1281 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1288 Given a route table and meid string, find the owner (if known). Returns a pointer to
1289 the endpoint struct or nil.
1291 static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) {
1292 endpoint_t* ep; // the ep we found in the hash
1294 if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1298 return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );