3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License") ;
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
49 #include <RIC_message_types.h> // needed for route manager messages
53 Passed to a symtab foreach callback to construct a list of pointers from
56 typedef struct thing_list {
63 // ---- debugging/testing -------------------------------------------------------------------------
66 Dump some stats for an endpoint in the RT. This is generally called to
67 verify endpoints after a table load/change.
69 This is called by the for-each mechanism of the symtab and the prototype is
70 fixe; we don't really use some of the parms, but have dummy references to
71 keep sonar from complaining.
73 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
77 if( (ep = (endpoint_t *) thing) == NULL ) {
81 if( (counter = (int *) vcounter) != NULL ) {
84 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
87 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
91 Called to count meid entries in the table. The meid points to an 'owning' endpoint
92 so we can list what we find
94 See note in ep_stats about dummy refs.
96 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
100 if( (ep = (endpoint_t *) thing) == NULL ) {
104 if( (counter = (int *) vcounter) != NULL ) {
107 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
110 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
114 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
115 the 'source' information and is added to each message.
117 See note above about dummy references.
119 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
123 if( (ep = (endpoint_t *) thing) == NULL ) {
124 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
128 if( (id = (char *) vid) == NULL ) {
132 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
133 (long long) time( NULL ),
137 ep->scounts[EPSC_GOOD],
138 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
139 ep->scounts[EPSC_FAIL],
140 ep->scounts[EPSC_TRANS] );
144 Dump stats for a route entry in the table.
146 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
148 rtable_ent_t const* rte; // thing is really an rte
152 if( (rte = (rtable_ent_t *) thing) == NULL ) {
153 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
157 if( (counter = (int *) vcounter) != NULL ) {
161 mtype = rte->key & 0xffff;
162 sid = (int) (rte->key >> 32);
164 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
168 Given a route table, cause some stats to be spit out.
170 static void rt_stats( route_table_t* rt ) {
174 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
178 counter = (int *) malloc( sizeof( int ) );
180 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
181 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
182 rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
183 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
185 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
187 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
188 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
190 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
192 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
193 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
199 Given a route table, cause endpoint counters to be written to stderr. The id
200 parm is written as the "source" in the output.
202 static void rt_epcounts( route_table_t* rt, char* id ) {
204 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
208 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
211 // ------------ route manager communication -------------------------------------------------
213 Send a request for a table update to the route manager. Updates come in
214 async, so send and go.
216 pctx is the private context for the thread; ctx is the application context
217 that we need to be able to send the application ID in case rt mgr needs to
218 use it to idenfity us.
220 Returns 0 if we were not able to send a request.
222 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
226 if( ctx->rtg_whid < 0 ) {
230 smsg = rmr_alloc_msg( pctx, 1024 );
232 smsg->mtype = RMRRM_REQ_TABLE;
234 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
235 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
236 smsg->len = strlen( smsg->payload ) + 1;
238 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
239 if( (state = smsg->state) != RMR_OK ) {
240 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
241 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
245 rmr_free_msg( smsg );
252 Send an ack to the route table manager for a table ID that we are
253 processing. State is 1 for OK, and 0 for failed. Reason might
254 be populated if we know why there was a failure.
256 Context should be the PRIVATE context that we use for messages
257 to route manger and NOT the user's context.
259 If a message buffere is passed we use that and use return to sender
260 assuming that this might be a response to a call and that is needed
261 to send back to the proper calling thread. If msg is nil, we allocate
264 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
266 int payload_size = 1024;
268 if( ctx == NULL || ctx->rtg_whid < 0 ) {
272 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
277 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
280 smsg = rmr_alloc_msg( ctx, payload_size );
284 smsg->mtype = RMRRM_TABLE_STATE;
286 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
287 table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
289 smsg->len = strlen( smsg->payload ) + 1;
291 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
293 smsg = rmr_rts_msg( ctx, smsg );
295 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
297 if( (state = smsg->state) != RMR_OK ) {
298 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
299 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
304 rmr_free_msg( smsg ); // if not our message we must free the leftovers
309 // ------------------------------------------------------------------------------------------------
311 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
312 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
314 static char* clip( char* buf ) {
317 while( *buf && isspace( *buf ) ) { // skip leading whitespace
321 if( (tok = strchr( buf, '#' )) != NULL ) {
323 return buf; // just push back; leading comment sym handled there
326 if( isspace( *(tok-1) ) ) {
331 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
338 This accepts a pointer to a nil terminated string, and ensures that there is a
339 newline as the last character. If there is not, a new buffer is allocated and
340 the newline is added. If a new buffer is allocated, the buffer passed in is
341 freed. The function returns a pointer which the caller should use, and must
342 free. In the event of an error, a nil pointer is returned.
344 static char* ensure_nlterm( char* buf ) {
352 nb = buf; // default to returning original as is
359 if( *buf != '\n' ) { // not a newline; realloc
360 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
361 nb = strdup( " \n" );
368 if( buf[len-1] != '\n' ) { // not newline terminated, realloc
369 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
370 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
371 memcpy( nb, buf, len );
372 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
384 Given a message type create a route table entry and add to the hash keyed on the
385 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
386 is the number of group slots to allocate in the entry.
388 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
390 rtable_ent_t* old_rte; // entry which was already in the table for the key
396 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
397 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
400 memset( rte, 0, sizeof( *rte ) );
404 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
409 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
413 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
415 rte->rrgroups = NULL;
418 rte->nrrgroups = nrrgroups;
420 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
421 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
424 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
426 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
431 This accepts partially parsed information from an rte or mse record sent by route manager or read from
433 ts_field is the msg-type,sender field
434 subid is the integer subscription id
435 rr_field is the endpoint information for round robening message over
437 If all goes well, this will add an RTE to the table under construction.
439 The ts_field is checked to see if we should ingest this record. We ingest if one of
441 there is no sender info (a generic entry for all)
442 there is sender and our host:port matches one of the senders
443 the sender info is an IP address that matches one of our IP addresses
445 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
446 rtable_ent_t* rte; // route table entry added
449 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
453 int ngtoks; // number of tokens in the group list
454 int grp; // index into group list
456 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
457 rr_field = clip( rr_field );
459 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
460 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
461 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
463 key = build_rt_key( subid, atoi( ts_field ) );
465 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
467 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
468 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
469 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
471 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
472 rte->mtype = atoi( ts_field ); // capture mtype for debugging
474 for( grp = 0; grp < ngtoks; grp++ ) {
475 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
476 for( i = 0; i < ntoks; i++ ) {
477 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
478 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
479 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
486 if( DEBUG || (vlevel > 2) ) {
487 rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
493 Trash_entry takes a partially parsed record from the input and
494 will delete the entry if the sender,mtype matches us or it's a
495 generic mtype. The refernce in the new table is removed and the
496 refcounter for the actual rte is decreased. If that ref count is
497 0 then the memory is freed (handled byh the del_rte call).
499 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
500 rtable_ent_t* rte; // route table entry to be 'deleted'
503 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
506 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
510 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
512 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
513 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
514 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
516 key = build_rt_key( subid, atoi( ts_field ) );
517 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
519 if( DEBUG || (vlevel > 1) ) {
520 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
522 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
523 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
525 if( DEBUG || (vlevel > 1) ) {
526 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
530 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
535 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
536 the 'owner' which should be the dns name or IP address of an enpoint
537 the meid_list is a space separated list of me IDs
539 This function assumes the caller has vetted the pointers as needed.
541 For each meid in the list, an entry is pushed into the hash which references the owner
542 endpoint such that when the meid is used to route a message it references the endpoint
545 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
551 endpoint_t* ep; // endpoint struct for the owner
553 owner = clip( owner ); // ditch extra whitespace and trailing comments
554 meid_list = clip( meid_list );
556 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
557 for( i = 0; i < ntoks; i++ ) {
558 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
559 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
560 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
562 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
568 Given the tokens from an mme_del, delete the listed meid entries from the new
569 table. The list is a space separated list of meids.
571 The meids in the hash reference endpoints which are never deleted and so
572 the only thing that we need to do here is to remove the meid from the hash.
574 This function assumes the caller has vetted the pointers as needed.
576 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
582 if( rtab->hash == NULL ) {
586 meid_list = clip( meid_list );
588 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
589 for( i = 0; i < ntoks; i++ ) {
590 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
591 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
596 Parse a partially parsed meid record. Tokens[0] should be one of:
597 meid_map, mme_ar, mme_del.
599 pctx is the private context needed to return an ack/nack using the provided
600 message buffer with the route managers address info.
602 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
605 if( tokens == NULL || ntoks < 1 ) {
606 return; // silent but should never happen
609 if( ntoks < 2 ) { // must have at least two for any valid request record
610 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
614 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
615 tokens[1] = clip( tokens[1] );
616 if( *(tokens[1]) == 's' ) {
617 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
618 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
619 uta_rt_drop( ctx->new_rtable );
620 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
623 if( ctx->table_id != NULL ) {
624 free( ctx->table_id );
627 ctx->table_id = strdup( clip( tokens[2] ) );
629 ctx->table_id = NULL;
631 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
632 ctx->new_rtable->mupdates = 0;
633 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
635 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
636 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
637 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
638 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
639 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
640 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
641 uta_rt_drop( ctx->new_rtable );
642 ctx->new_rtable = NULL;
646 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
649 if( ctx->new_rtable ) {
650 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
651 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
652 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
653 ctx->new_rtable = NULL;
654 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
655 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
658 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
659 rt_stats( ctx->old_rtable );
660 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
661 rt_stats( ctx->rtable );
664 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
665 ctx->new_rtable = NULL;
673 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
674 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
678 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
679 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
680 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
683 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
684 ctx->new_rtable->mupdates++;
687 if( strcmp( tokens[0], "mme_del" ) == 0 ) {
689 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
692 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
693 ctx->new_rtable->mupdates++;
698 Parse a single record recevied from the route table generator, or read
699 from a static route table file. Start records cause a new table to
700 be started (if a partial table was received it is discarded. Table
701 entry records are added to the currenly 'in progress' table, and an
702 end record causes the in progress table to be finalised and the
703 currently active table is replaced.
705 The updated table will be activated when the *|end record is encountered.
706 However, to allow for a "double" update, where both the meid map and the
707 route table must be updated at the same time, the end indication on a
708 route table (new or update) may specifiy "hold" which indicates that meid
709 map entries are to follow and the updated route table should be held as
710 pending until the end of the meid map is received and validated.
712 CAUTION: we are assuming that there is a single route/meid map generator
713 and as such only one type of update is received at a time; in other
714 words, the sender cannot mix update records and if there is more than
715 one sender process they must synchronise to avoid issues.
718 For a RT update, we expect:
719 newrt | start | <table-id>
720 newrt | end | <count>
721 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
722 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
723 mse| <mtype>[,sender] | <sub-id> | %meid
726 For a meid map update we expect:
727 meid_map | start | <table-id>
728 meid_map | end | <count> | <md5-hash>
729 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
730 mme_del | <meid0> <meid1>...<meidn>
733 The pctx is our private context that must be used to send acks/status
734 messages back to the route manager. The regular ctx is the ctx that
735 the user has been given and thus that's where we have to hang the route
736 table we're working with.
738 If mbuf is given, and we need to ack, then we ack using the mbuf and a
739 return to sender call (allows route manager to use wh_call() to send
740 an update and rts is required to get that back to the right thread).
741 If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
744 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
746 int ntoks; // number of tokens found in something
748 int grp; // group number
749 rtable_ent_t const* rte; // route table entry added
751 char* tok; // pointer into a token or string
758 while( *buf && isspace( *buf ) ) { // skip leading whitespace
761 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
764 memset( tokens, 0, sizeof( tokens ) );
765 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
766 tokens[0] = clip( tokens[0] );
767 switch( *(tokens[0]) ) {
768 case 0: // ignore blanks
770 case '#': // and comment lines
773 case 'd': // del | [sender,]mtype | sub-id
774 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
779 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
783 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
784 ctx->new_rtable->updates++;
787 case 'n': // newrt|{start|end}
788 tokens[1] = clip( tokens[1] );
789 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
791 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
792 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
793 ctx->new_rtable->updates, tokens[2] );
794 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
795 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
796 uta_rt_drop( ctx->new_rtable );
797 ctx->new_rtable = NULL;
802 if( ctx->new_rtable ) {
803 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
804 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
805 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
806 ctx->new_rtable = NULL;
807 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
810 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
811 rt_stats( ctx->old_rtable );
812 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
813 rt_stats( ctx->rtable );
816 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
817 ctx->rtable_ready = 1; // route based sends can now happen
819 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
820 ctx->new_rtable = NULL;
822 } else { // start a new table.
823 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
824 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
826 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
827 uta_rt_drop( ctx->new_rtable );
830 if( ctx->table_id != NULL ) {
831 free( ctx->table_id );
834 ctx->table_id = strdup( clip( tokens[2] ) );
836 ctx->table_id = NULL;
839 ctx->new_rtable = NULL;
840 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
841 ctx->new_rtable->updates = 0; // init count of entries received
842 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
846 case 'm': // mse entry or one of the meid_ records
847 if( strcmp( tokens[0], "mse" ) == 0 ) {
848 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
853 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
857 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
858 ctx->new_rtable->updates++;
860 meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
864 case 'r': // assume rt entry
865 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
869 ctx->new_rtable->updates++;
870 if( ntoks > 3 ) { // assume new entry with subid last
871 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
873 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
877 case 'u': // update current table, not a total replacement
878 tokens[1] = clip( tokens[1] );
879 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
880 if( ctx->new_rtable == NULL ) { // update table not in progress
885 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
886 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
887 ctx->new_rtable->updates, tokens[2] );
888 uta_rt_drop( ctx->new_rtable );
889 ctx->new_rtable = NULL;
894 if( ctx->new_rtable ) {
895 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
896 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
897 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
898 ctx->new_rtable = NULL;
899 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
902 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
903 rt_stats( ctx->old_rtable );
904 rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
905 rt_stats( ctx->rtable );
908 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
909 ctx->new_rtable = NULL;
911 } else { // start a new table.
912 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
913 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
914 uta_rt_drop( ctx->new_rtable );
918 if( ctx->table_id != NULL ) {
919 free( ctx->table_id );
921 ctx->table_id = strdup( clip( tokens[2] ) );
924 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
925 ctx->new_rtable->updates = 0; // init count of updates received
926 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
931 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
938 This function attempts to open a static route table in order to create a 'seed'
939 table during initialisation. The environment variable RMR_SEED_RT is expected
940 to contain the necessary path to the file. If missing, or if the file is empty,
941 no route table will be available until one is received from the generator.
943 This function is probably most useful for testing situations, or extreme
944 cases where the routes are static.
946 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
949 char* fbuf; // buffer with file contents
950 char* rec; // start of the record
951 char* eor; // end of the record
952 int rcount = 0; // record count for debug
954 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
958 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
959 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
963 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
964 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
966 *eor = '\n'; // will look like a blank line which is ok
971 while( rec && *rec ) {
973 if( (eor = strchr( rec, '\n' )) != NULL ) {
976 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
977 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
982 parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
987 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
992 Callback driven for each named thing in a symtab. We collect the pointers to those
993 things for later use (cloning).
995 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
998 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1002 if( thing == NULL ) {
1003 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1007 tl->names[tl->nused] = name; // the name/key
1008 tl->things[tl->nused++] = thing; // save a reference to the thing
1012 Called to delete a route table entry struct. We delete the array of endpoint
1013 pointers, but NOT the endpoints referenced as those are referenced from
1016 Route table entries can be concurrently referenced by multiple symtabs, so
1017 the actual delete happens only if decrementing the rte's ref count takes it
1018 to 0. Thus, it is safe to call this function across a symtab when cleaning up
1019 the symtab, or overlaying an entry.
1021 This function uses ONLY the pointer to the rte (thing) and ignores the other
1022 information that symtab foreach function passes (st, entry, and data) which
1023 means that it _can_ safetly be used outside of the foreach setting. If
1024 the function is changed to depend on any of these three, then a stand-alone
1025 rte_cleanup() function should be added and referenced by this, and refererences
1026 to this outside of the foreach world should be changed.
1028 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1032 if( (rte = (rtable_ent_t *) thing) == NULL ) {
1033 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1038 if( rte->refs > 0 ) { // something still referencing, so it lives
1042 if( rte->rrgroups ) { // clean up the round robin groups
1043 for( i = 0; i < rte->nrrgroups; i++ ) {
1044 if( rte->rrgroups[i] ) {
1045 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
1049 free( rte->rrgroups );
1052 free( rte ); // finally, drop the potato
1056 Read an entire file into a buffer. We assume for route table files
1057 they will be smallish and so this won't be a problem.
1058 Returns a pointer to the buffer, or nil. Caller must free.
1059 Terminates the buffer with a nil character for string processing.
1061 If we cannot stat the file, we assume it's empty or missing and return
1062 an empty buffer, as opposed to a nil, so the caller can generate defaults
1063 or error if an empty/missing file isn't tolerated.
1065 static char* uta_fib( char const* fname ) {
1067 off_t fsize = 8192; // size of the file
1068 off_t nread; // number of bytes read
1070 char* buf; // input buffer
1072 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1073 if( fstat( fd, &stats ) >= 0 ) {
1074 if( stats.st_size <= 0 ) { // empty file
1078 fsize = stats.st_size; // stat ok, save the file size
1081 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1085 if( fd < 0 ) { // didn't open or empty
1086 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1094 // add a size limit check here
1096 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1102 nread = read( fd, buf, fsize );
1103 if( nread < 0 || nread > fsize ) { // failure of some kind
1105 errno = EFBIG; // likely too much to handle
1117 Create and initialise a route table; Returns a pointer to the table struct.
1119 static route_table_t* uta_rt_init( ) {
1122 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1125 memset( rt, 0, sizeof( *rt ) );
1127 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1136 Clones one of the spaces in the given table.
1137 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1138 Space is the space in the old table to copy. Space 0 uses an integer key and
1139 references rte structs. All other spaces use a string key and reference endpoints.
1141 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1142 endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
1143 rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
1144 void* sst; // source symtab
1145 void* nst; // new symtab
1146 thing_list_t things; // things from the space to copy
1148 int free_on_err = 0;
1150 if( nrt == NULL ) { // make a new table if needed
1152 nrt = uta_rt_init();
1158 if( srt == NULL ) { // source was nil, just give back the new table
1162 things.nalloc = 2048;
1164 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1165 memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1166 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1167 memset( things.names, 0, sizeof( char * ) * things.nalloc );
1168 if( things.things == NULL ) {
1170 rmr_sym_free( nrt->hash );
1178 sst = srt->hash; // convenience pointers (src symtab)
1181 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1183 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1184 for( i = 0; i < things.nused; i++ ) {
1185 if( space ) { // string key, epoint reference
1186 ep = (endpoint_t *) things.things[i];
1187 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1189 rte = (rtable_ent_t *) things.things[i];
1190 rte->refs++; // rtes can be removed, so we track references
1191 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1195 free( things.things );
1196 free( (void *) things.names );
1201 Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1202 The endpoint and meid entries in the hash must be preserved.
1204 NOTE: The first call to rt_clone_space() will create the new table and subsequent
1205 calls operate on the new table. The return of subsequent calls can be safely
1206 ignored. There are some code analysers which will claim that there are memory
1207 leaks here; not true as they aren't understanding the logic, just looking at
1208 an ignored return value and assuming it's different than what was passed in.
1210 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1211 endpoint_t* ep; // an endpoint
1212 rtable_ent_t* rte; // a route table entry
1213 route_table_t* nrt = NULL; // new route table
1217 return uta_rt_init(); // no source to clone, just return an empty table
1220 nrt = rt_clone_space( srt, NULL, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
1221 rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
1227 Creates a new route table and then clones _all_ of the given route table (references
1228 both endpoints AND the route table entries. Needed to support a partial update where
1229 some route table entries will not be deleted if not explicitly in the update and when
1230 we are adding/replacing meid references.
1232 NOTE see note in uta_rt_clone() as it applies here too.
1234 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1235 endpoint_t const* ep; // an endpoint
1236 rtable_ent_t const* rte; // a route table entry
1237 route_table_t* nrt = NULL; // new route table
1241 return uta_rt_init(); // no source to clone, just return an empty table
1244 nrt = rt_clone_space( srt, NULL, RT_MT_SPACE ); // create new, clone all spaces to it
1245 rt_clone_space( srt, nrt, RT_NAME_SPACE );
1246 rt_clone_space( srt, nrt, RT_ME_SPACE );
1252 Given a name, find the endpoint struct in the provided route table.
1254 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1256 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1260 return rmr_sym_get( rt->hash, ep_name, 1 );
1264 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1266 static void uta_rt_drop( route_table_t* rt ) {
1271 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1272 rmr_sym_free( rt->hash ); // free all of the hash related data
1277 Look up and return the pointer to the endpoint stuct matching the given name.
1278 If not in the hash, a new endpoint is created, added to the hash. Should always
1281 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1284 if( !rt || !ep_name || ! *ep_name ) {
1285 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1290 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1291 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1292 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1297 ep->notify = 1; // show notification on first connection failure
1298 ep->open = 0; // not connected
1299 ep->addr = uta_h2ip( ep_name );
1300 ep->name = strdup( ep_name );
1301 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1302 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1304 rmr_sym_put( rt->hash, ep_name, 1, ep );
1312 Given a session id and message type build a key that can be used to look up the rte in the route
1313 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1315 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1318 if( sub_id == UNSET_SUBID ) {
1319 key = 0xffffffff00000000 | mtype;
1321 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1328 Given a route table and meid string, find the owner (if known). Returns a pointer to
1329 the endpoint struct or nil.
1331 static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
1332 endpoint_t const* ep; // the ep we found in the hash
1334 if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1338 return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );