3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
49 #include <RIC_message_types.h> // needed for route manager messages
53 Passed to a symtab foreach callback to construct a list of pointers from
56 typedef struct thing_list {
63 // ---- debugging/testing -------------------------------------------------------------------------
66 Dump some stats for an endpoint in the RT. This is generally called to
67 verify endpoints after a table load/change.
69 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
73 if( (ep = (endpoint_t *) thing) == NULL ) {
77 if( (counter = (int *) vcounter) != NULL ) {
81 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
85 Called to count meid entries in the table. The meid points to an 'owning' endpoint
86 so we can list what we find
88 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
92 if( (ep = (endpoint_t *) thing) == NULL ) {
96 if( (counter = (int *) vcounter) != NULL ) {
100 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
104 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
105 the 'source' information and is added to each message.
107 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
111 if( (ep = (endpoint_t *) thing) == NULL ) {
115 if( (id = (char *) vid) == NULL ) {
119 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
120 (long long) time( NULL ),
124 ep->scounts[EPSC_GOOD],
125 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
126 ep->scounts[EPSC_FAIL],
127 ep->scounts[EPSC_TRANS] );
131 Dump stats for a route entry in the table.
133 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
135 rtable_ent_t* rte; // thing is really an rte
139 if( (rte = (rtable_ent_t *) thing) == NULL ) {
143 if( (counter = (int *) vcounter) != NULL ) {
147 mtype = rte->key & 0xffff;
148 sid = (int) (rte->key >> 32);
150 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
154 Given a route table, cause some stats to be spit out.
156 static void rt_stats( route_table_t* rt ) {
160 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
164 counter = (int *) malloc( sizeof( int ) );
166 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
167 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
168 rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
169 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
171 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
173 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
174 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
176 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
178 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
179 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
185 Given a route table, cause endpoint counters to be written to stderr. The id
186 parm is written as the "source" in the output.
188 static void rt_epcounts( route_table_t* rt, char* id ) {
190 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
194 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
197 // ------------ route manager communication -------------------------------------------------
199 Send a request for a table update to the route manager. Updates come in
200 async, so send and go.
202 pctx is the private context for the thread; ctx is the application context
203 that we need to be able to send the application ID in case rt mgr needs to
204 use it to idenfity us.
206 Returns 0 if we were not able to send a request.
208 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
212 if( ctx->rtg_whid < 0 ) {
216 smsg = rmr_alloc_msg( pctx, 1024 );
218 smsg->mtype = RMRRM_REQ_TABLE;
220 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
221 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
222 smsg->len = strlen( smsg->payload ) + 1;
224 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
225 if( (state = smsg->state) != RMR_OK ) {
226 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
227 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
231 rmr_free_msg( smsg );
238 Send an ack to the route table manager for a table ID that we are
239 processing. State is 1 for OK, and 0 for failed. Reason might
240 be populated if we know why there was a failure.
242 Context should be the PRIVATE context that we use for messages
243 to route manger and NOT the user's context.
245 static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason ) {
248 if( ctx == NULL || ctx->rtg_whid < 0 ) {
252 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
256 smsg = rmr_alloc_msg( ctx, 1024 );
258 smsg->mtype = RMRRM_TABLE_STATE;
260 snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
261 ctx->table_id == NULL ? "<id-missing>" : ctx->table_id, reason == NULL ? "" : reason );
263 smsg->len = strlen( smsg->payload ) + 1;
265 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
266 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
267 if( (state = smsg->state) != RMR_OK ) {
268 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
269 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
273 rmr_free_msg( smsg );
277 // ------------------------------------------------------------------------------------------------
279 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
280 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
282 static char* clip( char* buf ) {
285 while( *buf && isspace( *buf ) ) { // skip leading whitespace
289 if( (tok = strchr( buf, '#' )) != NULL ) {
291 return buf; // just push back; leading comment sym handled there
294 if( isspace( *(tok-1) ) ) {
299 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
306 This accepts a pointer to a nil terminated string, and ensures that there is a
307 newline as the last character. If there is not, a new buffer is allocated and
308 the newline is added. If a new buffer is allocated, the buffer passed in is
309 freed. The function returns a pointer which the caller should use, and must
310 free. In the event of an error, a nil pointer is returned.
312 static char* ensure_nlterm( char* buf ) {
317 if( buf == NULL || (len = strlen( buf )) < 2 ) {
318 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
323 if( buf[len-1] != '\n' ) {
324 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
325 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
326 memcpy( nb, buf, len );
327 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
339 Given a message type create a route table entry and add to the hash keyed on the
340 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
341 is the number of group slots to allocate in the entry.
343 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
345 rtable_ent_t* old_rte; // entry which was already in the table for the key
351 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
352 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
355 memset( rte, 0, sizeof( *rte ) );
359 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
364 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
368 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
370 rte->rrgroups = NULL;
373 rte->nrrgroups = nrrgroups;
375 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
376 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
379 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
381 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
386 This accepts partially parsed information from an rte or mse record sent by route manager or read from
388 ts_field is the msg-type,sender field
389 subid is the integer subscription id
390 rr_field is the endpoint information for round robening message over
392 If all goes well, this will add an RTE to the table under construction.
394 The ts_field is checked to see if we should ingest this record. We ingest if one of
396 there is no sender info (a generic entry for all)
397 there is sender and our host:port matches one of the senders
398 the sender info is an IP address that matches one of our IP addresses
400 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
401 rtable_ent_t* rte; // route table entry added
404 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
408 int ngtoks; // number of tokens in the group list
409 int grp; // index into group list
411 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
412 rr_field = clip( rr_field );
414 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
415 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
416 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
418 key = build_rt_key( subid, atoi( ts_field ) );
420 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
422 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
423 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
424 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
426 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
427 rte->mtype = atoi( ts_field ); // capture mtype for debugging
429 for( grp = 0; grp < ngtoks; grp++ ) {
430 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
431 for( i = 0; i < ntoks; i++ ) {
432 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
433 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
434 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
441 if( DEBUG || (vlevel > 2) ) {
442 rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
448 Trash_entry takes a partially parsed record from the input and
449 will delete the entry if the sender,mtype matches us or it's a
450 generic mtype. The refernce in the new table is removed and the
451 refcounter for the actual rte is decreased. If that ref count is
452 0 then the memory is freed (handled byh the del_rte call).
454 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
455 rtable_ent_t* rte; // route table entry to be 'deleted'
458 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
461 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
465 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
467 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
468 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
469 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
471 key = build_rt_key( subid, atoi( ts_field ) );
472 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
474 if( DEBUG || (vlevel > 1) ) {
475 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
477 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
478 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
480 if( DEBUG || (vlevel > 1) ) {
481 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
485 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
490 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
491 the 'owner' which should be the dns name or IP address of an enpoint
492 the meid_list is a space separated list of me IDs
494 This function assumes the caller has vetted the pointers as needed.
496 For each meid in the list, an entry is pushed into the hash which references the owner
497 endpoint such that when the meid is used to route a message it references the endpoint
500 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
506 endpoint_t* ep; // endpoint struct for the owner
508 owner = clip( owner ); // ditch extra whitespace and trailing comments
509 meid_list = clip( meid_list );
511 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
512 for( i = 0; i < ntoks; i++ ) {
513 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
514 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
515 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
517 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
523 Given the tokens from an mme_del, delete the listed meid entries from the new
524 table. The list is a space separated list of meids.
526 The meids in the hash reference endpoints which are never deleted and so
527 the only thing that we need to do here is to remove the meid from the hash.
529 This function assumes the caller has vetted the pointers as needed.
531 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
537 if( rtab->hash == NULL ) {
541 meid_list = clip( meid_list );
543 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
544 for( i = 0; i < ntoks; i++ ) {
545 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
546 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
551 Parse a partially parsed meid record. Tokens[0] should be one of:
552 meid_map, mme_ar, mme_del.
554 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
555 if( tokens == NULL || ntoks < 1 ) {
556 return; // silent but should never happen
559 if( ntoks < 2 ) { // must have at least two for any valid request record
560 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
564 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
565 tokens[1] = clip( tokens[1] );
566 if( *(tokens[1]) == 's' ) {
567 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
568 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
569 uta_rt_drop( ctx->new_rtable );
572 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
573 ctx->new_rtable->mupdates = 0;
574 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
576 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
577 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
578 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
579 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
580 uta_rt_drop( ctx->new_rtable );
581 ctx->new_rtable = NULL;
585 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
588 if( ctx->new_rtable ) {
589 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
590 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
591 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
592 ctx->new_rtable = NULL;
593 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
596 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
597 rt_stats( ctx->old_rtable );
598 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
599 rt_stats( ctx->rtable );
602 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
603 ctx->new_rtable = NULL;
611 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
612 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
616 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
617 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
618 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
621 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
622 ctx->new_rtable->mupdates++;
625 if( strcmp( tokens[0], "mme_del" ) == 0 ) {
627 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
630 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
631 ctx->new_rtable->mupdates++;
636 Parse a single record recevied from the route table generator, or read
637 from a static route table file. Start records cause a new table to
638 be started (if a partial table was received it is discarded. Table
639 entry records are added to the currenly 'in progress' table, and an
640 end record causes the in progress table to be finalised and the
641 currently active table is replaced.
643 The updated table will be activated when the *|end record is encountered.
644 However, to allow for a "double" update, where both the meid map and the
645 route table must be updated at the same time, the end indication on a
646 route table (new or update) may specifiy "hold" which indicates that meid
647 map entries are to follow and the updated route table should be held as
648 pending until the end of the meid map is received and validated.
650 CAUTION: we are assuming that there is a single route/meid map generator
651 and as such only one type of update is received at a time; in other
652 words, the sender cannot mix update records and if there is more than
653 one sender process they must synchronise to avoid issues.
656 For a RT update, we expect:
657 newrt | start | <table-id>
658 newrt | end | <count>
659 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
660 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
661 mse| <mtype>[,sender] | <sub-id> | %meid
664 For a meid map update we expect:
665 meid_map | start | <table-id>
666 meid_map | end | <count> | <md5-hash>
667 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
668 mme_del | <meid0> <meid1>...<meidn>
671 The pctx is our private context that must be used to send acks/status
672 messages back to the route manager. The regular ctx is the ctx that
673 the user has been given and thus that's where we have to hang the route
674 table we're working with.
676 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel ) {
678 int ntoks; // number of tokens found in something
680 int grp; // group number
681 rtable_ent_t* rte; // route table entry added
683 char* tok; // pointer into a token or string
690 while( *buf && isspace( *buf ) ) { // skip leading whitespace
693 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
696 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
697 tokens[0] = clip( tokens[0] );
698 switch( *(tokens[0]) ) {
699 case 0: // ignore blanks
701 case '#': // and comment lines
704 case 'd': // del | [sender,]mtype | sub-id
705 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
710 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
714 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
715 ctx->new_rtable->updates++;
718 case 'n': // newrt|{start|end}
719 tokens[1] = clip( tokens[1] );
720 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
722 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
723 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
724 ctx->new_rtable->updates, tokens[2] );
725 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
726 send_rt_ack( pctx, RMR_OK, wbuf );
727 uta_rt_drop( ctx->new_rtable );
728 ctx->new_rtable = NULL;
733 if( ctx->new_rtable ) {
734 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
735 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
736 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
737 ctx->new_rtable = NULL;
738 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
741 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
742 rt_stats( ctx->old_rtable );
743 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
744 rt_stats( ctx->rtable );
747 send_rt_ack( pctx, RMR_OK, NULL );
749 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
750 ctx->new_rtable = NULL;
752 } else { // start a new table.
753 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
754 send_rt_ack( pctx, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
756 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
757 uta_rt_drop( ctx->new_rtable );
760 if( ctx->table_id != NULL ) {
761 free( ctx->table_id );
764 ctx->table_id = strdup( tokens[2] );
766 ctx->table_id = NULL;
769 ctx->new_rtable = NULL;
770 ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
771 ctx->new_rtable->updates = 0; // init count of entries received
772 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
776 case 'm': // mse entry or one of the meid_ records
777 if( strcmp( tokens[0], "mse" ) == 0 ) {
778 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
783 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
787 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
788 ctx->new_rtable->updates++;
790 meid_parser( ctx, tokens, ntoks, vlevel );
794 case 'r': // assume rt entry
795 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
799 ctx->new_rtable->updates++;
800 if( ntoks > 3 ) { // assume new entry with subid last
801 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
803 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
807 case 'u': // update current table, not a total replacement
808 tokens[1] = clip( tokens[1] );
809 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
810 if( ctx->new_rtable == NULL ) { // update table not in progress
815 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
816 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
817 ctx->new_rtable->updates, tokens[2] );
818 uta_rt_drop( ctx->new_rtable );
819 ctx->new_rtable = NULL;
824 if( ctx->new_rtable ) {
825 uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
826 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
827 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
828 ctx->new_rtable = NULL;
829 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
832 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
833 rt_stats( ctx->old_rtable );
834 rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
835 rt_stats( ctx->rtable );
838 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
839 ctx->new_rtable = NULL;
841 } else { // start a new table.
842 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
843 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
844 uta_rt_drop( ctx->new_rtable );
848 if( ctx->table_id != NULL ) {
849 free( ctx->table_id );
851 ctx->table_id = strdup( tokens[2] );
854 ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
855 ctx->new_rtable->updates = 0; // init count of updates received
856 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
861 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
868 This function attempts to open a static route table in order to create a 'seed'
869 table during initialisation. The environment variable RMR_SEED_RT is expected
870 to contain the necessary path to the file. If missing, or if the file is empty,
871 no route table will be available until one is received from the generator.
873 This function is probably most useful for testing situations, or extreme
874 cases where the routes are static.
876 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
879 char* fbuf; // buffer with file contents
880 char* rec; // start of the record
881 char* eor; // end of the record
882 int rcount = 0; // record count for debug
884 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
888 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
889 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
893 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
894 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
896 *eor = '\n'; // will look like a blank line which is ok
900 for( rec = fbuf; rec && *rec; rec = eor+1 ) {
902 if( (eor = strchr( rec, '\n' )) != NULL ) {
905 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
906 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
911 parse_rt_rec( ctx, NULL, rec, vlevel ); // no pvt context as we can't ack
914 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
919 Callback driven for each named thing in a symtab. We collect the pointers to those
920 things for later use (cloning).
922 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
925 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
929 if( thing == NULL ) {
933 tl->names[tl->nused] = name; // the name/key
934 tl->things[tl->nused++] = thing; // save a reference to the thing
938 Called to delete a route table entry struct. We delete the array of endpoint
939 pointers, but NOT the endpoints referenced as those are referenced from
942 Route table entries can be concurrently referenced by multiple symtabs, so
943 the actual delete happens only if decrementing the rte's ref count takes it
944 to 0. Thus, it is safe to call this function across a symtab when cleaning up
945 the symtab, or overlaying an entry.
947 This function uses ONLY the pointer to the rte (thing) and ignores the other
948 information that symtab foreach function passes (st, entry, and data) which
949 means that it _can_ safetly be used outside of the foreach setting. If
950 the function is changed to depend on any of these three, then a stand-alone
951 rte_cleanup() function should be added and referenced by this, and refererences
952 to this outside of the foreach world should be changed.
954 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
958 if( (rte = (rtable_ent_t *) thing) == NULL ) {
963 if( rte->refs > 0 ) { // something still referencing, so it lives
967 if( rte->rrgroups ) { // clean up the round robin groups
968 for( i = 0; i < rte->nrrgroups; i++ ) {
969 if( rte->rrgroups[i] ) {
970 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
974 free( rte->rrgroups );
977 free( rte ); // finally, drop the potato
981 Read an entire file into a buffer. We assume for route table files
982 they will be smallish and so this won't be a problem.
983 Returns a pointer to the buffer, or nil. Caller must free.
984 Terminates the buffer with a nil character for string processing.
986 If we cannot stat the file, we assume it's empty or missing and return
987 an empty buffer, as opposed to a nil, so the caller can generate defaults
988 or error if an empty/missing file isn't tolerated.
990 static char* uta_fib( char* fname ) {
992 off_t fsize = 8192; // size of the file
993 off_t nread; // number of bytes read
995 char* buf; // input buffer
997 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
998 if( fstat( fd, &stats ) >= 0 ) {
999 if( stats.st_size <= 0 ) { // empty file
1003 fsize = stats.st_size; // stat ok, save the file size
1006 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1010 if( fd < 0 ) { // didn't open or empty
1011 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1019 // add a size limit check here
1021 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1027 nread = read( fd, buf, fsize );
1028 if( nread < 0 || nread > fsize ) { // failure of some kind
1030 errno = EFBIG; // likely too much to handle
1042 Create and initialise a route table; Returns a pointer to the table struct.
1044 static route_table_t* uta_rt_init( ) {
1047 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1051 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1060 Clones one of the spaces in the given table.
1061 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1062 Space is the space in the old table to copy. Space 0 uses an integer key and
1063 references rte structs. All other spaces use a string key and reference endpoints.
1065 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1066 endpoint_t* ep; // an endpoint
1067 rtable_ent_t* rte; // a route table entry
1068 void* sst; // source symtab
1069 void* nst; // new symtab
1070 thing_list_t things; // things from the space to copy
1072 int free_on_err = 0;
1074 if( nrt == NULL ) { // make a new table if needed
1076 nrt = uta_rt_init();
1079 if( srt == NULL ) { // source was nil, just give back the new table
1083 things.nalloc = 2048;
1085 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1086 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1087 if( things.things == NULL ) {
1097 sst = srt->hash; // convenience pointers (src symtab)
1100 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1102 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1103 for( i = 0; i < things.nused; i++ ) {
1104 if( space ) { // string key, epoint reference
1105 ep = (endpoint_t *) things.things[i];
1106 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1108 rte = (rtable_ent_t *) things.things[i];
1109 rte->refs++; // rtes can be removed, so we track references
1110 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1114 free( things.things );
1115 free( (void *) things.names );
1120 Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1121 The endpoint and meid entries in the hash must be preserved.
1123 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1124 endpoint_t* ep; // an endpoint
1125 rtable_ent_t* rte; // a route table entry
1126 route_table_t* nrt = NULL; // new route table
1130 return uta_rt_init(); // no source to clone, just return an empty table
1133 nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
1134 rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
1140 Creates a new route table and then clones _all_ of the given route table (references
1141 both endpoints AND the route table entries. Needed to support a partial update where
1142 some route table entries will not be deleted if not explicitly in the update and when
1143 we are adding/replacing meid references.
1145 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1146 endpoint_t* ep; // an endpoint
1147 rtable_ent_t* rte; // a route table entry
1148 route_table_t* nrt = NULL; // new route table
1152 return uta_rt_init(); // no source to clone, just return an empty table
1155 nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it
1156 rt_clone_space( srt, nrt, RT_NAME_SPACE );
1157 rt_clone_space( srt, nrt, RT_ME_SPACE );
1163 Given a name, find the endpoint struct in the provided route table.
1165 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1167 if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1171 return rmr_sym_get( rt->hash, ep_name, 1 );
1175 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1177 static void uta_rt_drop( route_table_t* rt ) {
1182 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1183 rmr_sym_free( rt->hash ); // free all of the hash related data
1188 Look up and return the pointer to the endpoint stuct matching the given name.
1189 If not in the hash, a new endpoint is created, added to the hash. Should always
1192 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1195 if( !rt || !ep_name || ! *ep_name ) {
1196 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1201 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1202 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1203 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1208 ep->notify = 1; // show notification on first connection failure
1209 ep->open = 0; // not connected
1210 ep->addr = uta_h2ip( ep_name );
1211 ep->name = strdup( ep_name );
1212 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1213 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1215 rmr_sym_put( rt->hash, ep_name, 1, ep );
1223 Given a session id and message type build a key that can be used to look up the rte in the route
1224 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1226 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1229 if( sub_id == UNSET_SUBID ) {
1230 key = 0xffffffff00000000 | mtype;
1232 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);