3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License") ;
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rt_generic_static.c
23 Abstract: These are route table functions which are not specific to the
24 underlying protocol. rtable_static, and rtable_nng_static
25 have transport provider specific code.
27 This file must be included before the nng/nano specific file as
30 Author: E. Scott Daniels
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
44 #include <sys/types.h>
50 #include <RIC_message_types.h> // needed for route manager messages
56 Passed to a symtab foreach callback to construct a list of pointers from
59 typedef struct thing_list {
66 // ---- debugging/testing -------------------------------------------------------------------------
69 Dump some stats for an endpoint in the RT. This is generally called to
70 verify endpoints after a table load/change.
72 This is called by the for-each mechanism of the symtab and the prototype is
73 fixe; we don't really use some of the parms, but have dummy references to
74 keep sonar from complaining.
76 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
80 if( (ep = (endpoint_t *) thing) == NULL ) {
84 if( (counter = (int *) vcounter) != NULL ) {
87 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
90 rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
94 Called to count meid entries in the table. The meid points to an 'owning' endpoint
95 so we can list what we find
97 See note in ep_stats about dummy refs.
99 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
103 if( (ep = (endpoint_t *) thing) == NULL ) {
107 if( (counter = (int *) vcounter) != NULL ) {
110 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
113 rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
117 Dump counts for an endpoint in the RT. The vid parm is assumed to point to
118 the 'source' information and is added to each message.
120 See note above about dummy references.
122 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
126 if( (ep = (endpoint_t *) thing) == NULL ) {
127 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
131 if( (id = (char *) vid) == NULL ) {
135 rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
136 (long long) time( NULL ),
140 ep->scounts[EPSC_GOOD],
141 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
142 ep->scounts[EPSC_FAIL],
143 ep->scounts[EPSC_TRANS] );
147 Dump stats for a route entry in the table.
149 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
151 rtable_ent_t const* rte; // thing is really an rte
155 if( (rte = (rtable_ent_t *) thing) == NULL ) {
156 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
160 if( (counter = (int *) vcounter) != NULL ) {
164 mtype = rte->key & 0xffff;
165 sid = (int) (rte->key >> 32);
167 rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
171 Given a route table, cause some stats to be spit out.
173 static void rt_stats( route_table_t* rt ) {
177 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
181 counter = (int *) malloc( sizeof( int ) );
183 rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
184 rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
185 rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
186 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
188 rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
190 rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
191 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
193 rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
195 rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
196 rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
202 Given a route table, cause endpoint counters to be written to stderr. The id
203 parm is written as the "source" in the output.
205 static void rt_epcounts( route_table_t* rt, char* id ) {
207 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
211 rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
214 // ------------ route manager communication -------------------------------------------------
216 Send a request for a table update to the route manager. Updates come in
217 async, so send and go.
219 pctx is the private context for the thread; ctx is the application context
220 that we need to be able to send the application ID in case rt mgr needs to
221 use it to idenfity us.
223 Returns 0 if we were not able to send a request.
225 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
229 if( ctx->rtg_whid < 0 ) {
233 smsg = rmr_alloc_msg( pctx, 1024 );
235 smsg->mtype = RMRRM_REQ_TABLE;
237 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
238 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
239 smsg->len = strlen( smsg->payload ) + 1;
241 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
242 if( (state = smsg->state) != RMR_OK ) {
243 rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
244 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
248 rmr_free_msg( smsg );
255 Send an ack to the route table manager for a table ID that we are
256 processing. State is 1 for OK, and 0 for failed. Reason might
257 be populated if we know why there was a failure.
259 Context should be the PRIVATE context that we use for messages
260 to route manger and NOT the user's context.
262 If a message buffere is passed we use that and use return to sender
263 assuming that this might be a response to a call and that is needed
264 to send back to the proper calling thread. If msg is nil, we allocate
267 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
269 int payload_size = 1024;
271 if( ctx == NULL || ctx->rtg_whid < 0 ) {
275 if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
280 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
283 smsg = rmr_alloc_msg( ctx, payload_size );
287 smsg->mtype = RMRRM_TABLE_STATE;
289 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
290 table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
292 smsg->len = strlen( smsg->payload ) + 1;
294 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
296 smsg = rmr_rts_msg( ctx, smsg );
298 smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
300 if( (state = smsg->state) != RMR_OK ) {
301 rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
302 rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
307 rmr_free_msg( smsg ); // if not our message we must free the leftovers
312 // ---- utility -----------------------------------------------------------------------------------
314 Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
315 must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
317 static char* clip( char* buf ) {
320 while( *buf && isspace( *buf ) ) { // skip leading whitespace
324 if( (tok = strchr( buf, '#' )) != NULL ) {
326 return buf; // just push back; leading comment sym handled there
329 if( isspace( *(tok-1) ) ) {
334 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
341 This accepts a pointer to a nil terminated string, and ensures that there is a
342 newline as the last character. If there is not, a new buffer is allocated and
343 the newline is added. If a new buffer is allocated, the buffer passed in is
344 freed. The function returns a pointer which the caller should use, and must
345 free. In the event of an error, a nil pointer is returned.
347 static char* ensure_nlterm( char* buf ) {
355 nb = buf; // default to returning original as is
362 if( *buf != '\n' ) { // not a newline; realloc
363 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
364 nb = strdup( " \n" );
371 if( buf[len-1] != '\n' ) { // not newline terminated, realloc
372 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
373 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
374 memcpy( nb, buf, len );
375 *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
387 Roll the new table into the active and the active into the old table. We
388 must have the lock on the active table to do this. It's possible that there
389 is no active table (first load), so we have to account for that (no locking).
391 static void roll_tables( uta_ctx_t* ctx ) {
393 if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
394 pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
395 ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
396 ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
397 pthread_mutex_unlock( ctx->rtgate );
399 ctx->old_rtable = NULL; // ensure there isn't an old reference
400 ctx->rtable = ctx->new_rtable; // make new the active one
403 ctx->new_rtable = NULL;
406 // ------------ entry update functions ---------------------------------------------------------------
408 Given a message type create a route table entry and add to the hash keyed on the
409 message type. Once in the hash, endpoints can be added with uta_add_ep. Size
410 is the number of group slots to allocate in the entry.
412 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
414 rtable_ent_t* old_rte; // entry which was already in the table for the key
420 if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
421 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
424 memset( rte, 0, sizeof( *rte ) );
428 if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
433 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
437 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
439 rte->rrgroups = NULL;
442 rte->nrrgroups = nrrgroups;
444 if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
445 del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced
448 rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
450 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
455 This accepts partially parsed information from an rte or mse record sent by route manager or read from
457 ts_field is the msg-type,sender field
458 subid is the integer subscription id
459 rr_field is the endpoint information for round robening message over
461 If all goes well, this will add an RTE to the table under construction.
463 The ts_field is checked to see if we should ingest this record. We ingest if one of
465 there is no sender info (a generic entry for all)
466 there is sender and our host:port matches one of the senders
467 the sender info is an IP address that matches one of our IP addresses
469 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
470 rtable_ent_t* rte; // route table entry added
473 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
477 int ngtoks; // number of tokens in the group list
478 int grp; // index into group list
480 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
481 rr_field = clip( rr_field );
483 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
484 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
485 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
487 key = build_rt_key( subid, atoi( ts_field ) );
489 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
491 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
492 if( strcmp( gtokens[0], "%meid" ) == 0 ) {
493 ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
495 rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
496 rte->mtype = atoi( ts_field ); // capture mtype for debugging
498 for( grp = 0; grp < ngtoks; grp++ ) {
499 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
500 for( i = 0; i < ntoks; i++ ) {
501 if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
502 if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
503 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
510 if( DEBUG || (vlevel > 2) ) {
511 rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
517 Trash_entry takes a partially parsed record from the input and
518 will delete the entry if the sender,mtype matches us or it's a
519 generic mtype. The refernce in the new table is removed and the
520 refcounter for the actual rte is decreased. If that ref count is
521 0 then the memory is freed (handled byh the del_rte call).
523 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
524 rtable_ent_t* rte; // route table entry to be 'deleted'
527 uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
530 if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
534 ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
536 if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
537 (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
538 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
540 key = build_rt_key( subid, atoi( ts_field ) );
541 rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
543 if( DEBUG || (vlevel > 1) ) {
544 rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
546 rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
547 del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
549 if( DEBUG || (vlevel > 1) ) {
550 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
554 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
558 // -------------------------- parse functions --------------------------------------------------
561 Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
562 the 'owner' which should be the dns name or IP address of an enpoint
563 the meid_list is a space separated list of me IDs
565 This function assumes the caller has vetted the pointers as needed.
567 For each meid in the list, an entry is pushed into the hash which references the owner
568 endpoint such that when the meid is used to route a message it references the endpoint
571 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
577 endpoint_t* ep; // endpoint struct for the owner
579 owner = clip( owner ); // ditch extra whitespace and trailing comments
580 meid_list = clip( meid_list );
582 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
583 for( i = 0; i < ntoks; i++ ) {
584 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
585 state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
586 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
588 rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
594 Given the tokens from an mme_del, delete the listed meid entries from the new
595 table. The list is a space separated list of meids.
597 The meids in the hash reference endpoints which are never deleted and so
598 the only thing that we need to do here is to remove the meid from the hash.
600 This function assumes the caller has vetted the pointers as needed.
602 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
608 if( rtab->hash == NULL ) {
612 meid_list = clip( meid_list );
614 ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
615 for( i = 0; i < ntoks; i++ ) {
616 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
617 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
622 Parse a partially parsed meid record. Tokens[0] should be one of:
623 meid_map, mme_ar, mme_del.
625 pctx is the private context needed to return an ack/nack using the provided
626 message buffer with the route managers address info.
628 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
631 if( tokens == NULL || ntoks < 1 ) {
632 return; // silent but should never happen
635 if( ntoks < 2 ) { // must have at least two for any valid request record
636 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
640 if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
641 tokens[1] = clip( tokens[1] );
642 if( *(tokens[1]) == 's' ) {
643 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
644 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
645 uta_rt_drop( ctx->new_rtable );
646 ctx->new_rtable = NULL;
647 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
650 if( ctx->table_id != NULL ) {
651 free( ctx->table_id );
654 ctx->table_id = strdup( clip( tokens[2] ) );
656 ctx->table_id = NULL;
659 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
660 ctx->new_rtable->mupdates = 0;
662 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
664 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
665 if( ntoks > 2 ) { // meid_map | end | <count> |??? given
666 if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
667 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
668 ctx->new_rtable->mupdates, tokens[2] );
669 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
670 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
671 uta_rt_drop( ctx->new_rtable );
672 ctx->new_rtable = NULL;
676 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
679 if( ctx->new_rtable ) {
680 roll_tables( ctx ); // roll active to old, and new to active with proper locking
681 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
682 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
685 if( ctx->old_rtable != NULL ) {
686 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
687 rt_stats( ctx->old_rtable );
689 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
691 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
692 rt_stats( ctx->rtable );
695 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
696 ctx->new_rtable = NULL;
704 if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
705 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
709 if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
710 if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
711 rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
714 parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
715 ctx->new_rtable->mupdates++;
719 if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
720 parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
721 ctx->new_rtable->mupdates++;
727 Parse a single record recevied from the route table generator, or read
728 from a static route table file. Start records cause a new table to
729 be started (if a partial table was received it is discarded. Table
730 entry records are added to the currenly 'in progress' table, and an
731 end record causes the in progress table to be finalised and the
732 currently active table is replaced.
734 The updated table will be activated when the *|end record is encountered.
735 However, to allow for a "double" update, where both the meid map and the
736 route table must be updated at the same time, the end indication on a
737 route table (new or update) may specifiy "hold" which indicates that meid
738 map entries are to follow and the updated route table should be held as
739 pending until the end of the meid map is received and validated.
741 CAUTION: we are assuming that there is a single route/meid map generator
742 and as such only one type of update is received at a time; in other
743 words, the sender cannot mix update records and if there is more than
744 one sender process they must synchronise to avoid issues.
747 For a RT update, we expect:
748 newrt | start | <table-id>
749 newrt | end | <count>
750 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
751 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
752 mse| <mtype>[,sender] | <sub-id> | %meid
755 For a meid map update we expect:
756 meid_map | start | <table-id>
757 meid_map | end | <count> | <md5-hash>
758 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
759 mme_del | <meid0> <meid1>...<meidn>
762 The pctx is our private context that must be used to send acks/status
763 messages back to the route manager. The regular ctx is the ctx that
764 the user has been given and thus that's where we have to hang the route
765 table we're working with.
767 If mbuf is given, and we need to ack, then we ack using the mbuf and a
768 return to sender call (allows route manager to use wh_call() to send
769 an update and rts is required to get that back to the right thread).
770 If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
773 static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
775 int ntoks; // number of tokens found in something
777 int grp; // group number
778 rtable_ent_t const* rte; // route table entry added
780 char* tok; // pointer into a token or string
787 while( *buf && isspace( *buf ) ) { // skip leading whitespace
790 for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
793 memset( tokens, 0, sizeof( tokens ) );
794 if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
795 tokens[0] = clip( tokens[0] );
796 switch( *(tokens[0]) ) {
797 case 0: // ignore blanks
799 case '#': // and comment lines
802 case 'd': // del | [sender,]mtype | sub-id
803 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
808 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
812 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
813 ctx->new_rtable->updates++;
816 case 'n': // newrt|{start|end}
817 tokens[1] = clip( tokens[1] );
818 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
820 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
821 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
822 ctx->new_rtable->updates, tokens[2] );
823 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
824 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
825 uta_rt_drop( ctx->new_rtable );
826 ctx->new_rtable = NULL;
831 if( ctx->new_rtable ) {
832 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
833 roll_tables( ctx ); // roll active to old, and new to active with proper locking
836 if( ctx->old_rtable != NULL ) {
837 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
838 rt_stats( ctx->old_rtable );
840 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
842 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
843 rt_stats( ctx->rtable );
846 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
847 ctx->rtable_ready = 1; // route based sends can now happen
849 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
850 ctx->new_rtable = NULL;
852 } else { // start a new table.
853 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
854 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
856 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
857 uta_rt_drop( ctx->new_rtable );
858 ctx->new_rtable = NULL;
861 if( ctx->table_id != NULL ) {
862 free( ctx->table_id );
865 ctx->table_id = strdup( clip( tokens[2] ) );
867 ctx->table_id = NULL;
870 ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
871 ctx->new_rtable->updates = 0; // init count of entries received
873 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
877 case 'm': // mse entry or one of the meid_ records
878 if( strcmp( tokens[0], "mse" ) == 0 ) {
879 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
884 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
888 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
889 ctx->new_rtable->updates++;
891 meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
895 case 'r': // assume rt entry
896 if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
900 ctx->new_rtable->updates++;
901 if( ntoks > 3 ) { // assume new entry with subid last
902 build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
904 build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
908 case 'u': // update current table, not a total replacement
909 tokens[1] = clip( tokens[1] );
910 if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
911 if( ctx->new_rtable == NULL ) { // update table not in progress
916 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
917 rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
918 ctx->new_rtable->updates, tokens[2] );
919 uta_rt_drop( ctx->new_rtable );
920 ctx->new_rtable = NULL;
925 if( ctx->new_rtable ) {
926 roll_tables( ctx ); // roll active to old, and new to active with proper locking
927 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
930 if( ctx->old_rtable != NULL ) {
931 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
932 rt_stats( ctx->old_rtable );
934 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
936 rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
937 rt_stats( ctx->rtable );
940 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
941 ctx->new_rtable = NULL;
943 } else { // start a new table.
944 if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
945 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
946 uta_rt_drop( ctx->new_rtable );
947 ctx->new_rtable = NULL;
951 if( ctx->table_id != NULL ) {
952 free( ctx->table_id );
954 ctx->table_id = strdup( clip( tokens[2] ) );
957 ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
958 ctx->new_rtable->updates = 0; // init count of updates received
960 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
965 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
972 This function attempts to open a static route table in order to create a 'seed'
973 table during initialisation. The environment variable RMR_SEED_RT is expected
974 to contain the necessary path to the file. If missing, or if the file is empty,
975 no route table will be available until one is received from the generator.
977 This function is probably most useful for testing situations, or extreme
978 cases where the routes are static.
980 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
983 char* fbuf; // buffer with file contents
984 char* rec; // start of the record
985 char* eor; // end of the record
986 int rcount = 0; // record count for debug
988 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
992 if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
993 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
997 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
998 for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
1000 *eor = '\n'; // will look like a blank line which is ok
1005 while( rec && *rec ) {
1007 if( (eor = strchr( rec, '\n' )) != NULL ) {
1010 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1011 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1016 parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
1021 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
1026 Callback driven for each named thing in a symtab. We collect the pointers to those
1027 things for later use (cloning).
1029 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1032 if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1036 if( thing == NULL ) {
1037 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1041 tl->names[tl->nused] = name; // the name/key
1042 tl->things[tl->nused++] = thing; // save a reference to the thing
1046 Called to delete a route table entry struct. We delete the array of endpoint
1047 pointers, but NOT the endpoints referenced as those are referenced from
1050 Route table entries can be concurrently referenced by multiple symtabs, so
1051 the actual delete happens only if decrementing the rte's ref count takes it
1052 to 0. Thus, it is safe to call this function across a symtab when cleaning up
1053 the symtab, or overlaying an entry.
1055 This function uses ONLY the pointer to the rte (thing) and ignores the other
1056 information that symtab foreach function passes (st, entry, and data) which
1057 means that it _can_ safetly be used outside of the foreach setting. If
1058 the function is changed to depend on any of these three, then a stand-alone
1059 rte_cleanup() function should be added and referenced by this, and refererences
1060 to this outside of the foreach world should be changed.
1062 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1066 if( (rte = (rtable_ent_t *) thing) == NULL ) {
1067 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
1072 if( rte->refs > 0 ) { // something still referencing, so it lives
1076 if( rte->rrgroups ) { // clean up the round robin groups
1077 for( i = 0; i < rte->nrrgroups; i++ ) {
1078 if( rte->rrgroups[i] ) {
1079 free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
1083 free( rte->rrgroups );
1086 free( rte ); // finally, drop the potato
1090 Read an entire file into a buffer. We assume for route table files
1091 they will be smallish and so this won't be a problem.
1092 Returns a pointer to the buffer, or nil. Caller must free.
1093 Terminates the buffer with a nil character for string processing.
1095 If we cannot stat the file, we assume it's empty or missing and return
1096 an empty buffer, as opposed to a nil, so the caller can generate defaults
1097 or error if an empty/missing file isn't tolerated.
1099 static char* uta_fib( char const* fname ) {
1101 off_t fsize = 8192; // size of the file
1102 off_t nread; // number of bytes read
1104 char* buf; // input buffer
1106 if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1107 if( fstat( fd, &stats ) >= 0 ) {
1108 if( stats.st_size <= 0 ) { // empty file
1112 fsize = stats.st_size; // stat ok, save the file size
1115 fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
1119 if( fd < 0 ) { // didn't open or empty
1120 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1128 // add a size limit check here
1130 if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
1136 nread = read( fd, buf, fsize );
1137 if( nread < 0 || nread > fsize ) { // failure of some kind
1139 errno = EFBIG; // likely too much to handle
1150 // --------------------- initialisation/creation ---------------------------------------------
1152 Create and initialise a route table; Returns a pointer to the table struct.
1154 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1160 if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1164 memset( rt, 0, sizeof( *rt ) );
1166 if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1171 rt->gate = ctx->rtgate; // single mutex needed for all route tables
1172 rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
1173 pthread_mutex_init( rt->gate, NULL );
1179 Clones one of the spaces in the given table.
1180 Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1181 Space is the space in the old table to copy. Space 0 uses an integer key and
1182 references rte structs. All other spaces use a string key and reference endpoints.
1184 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1185 endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
1186 rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
1187 void* sst; // source symtab
1188 void* nst; // new symtab
1189 thing_list_t things; // things from the space to copy
1191 int free_on_err = 0;
1196 if( nrt == NULL ) { // make a new table if needed
1198 nrt = uta_rt_init( ctx );
1204 if( srt == NULL ) { // source was nil, just give back the new table
1208 things.nalloc = 2048;
1210 things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1211 memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1212 things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1213 memset( things.names, 0, sizeof( char * ) * things.nalloc );
1214 if( things.things == NULL ) {
1216 rmr_sym_free( nrt->hash );
1224 sst = srt->hash; // convenience pointers (src symtab)
1227 rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
1229 if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
1230 for( i = 0; i < things.nused; i++ ) {
1231 if( space ) { // string key, epoint reference
1232 ep = (endpoint_t *) things.things[i];
1233 rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
1235 rte = (rtable_ent_t *) things.things[i];
1236 rte->refs++; // rtes can be removed, so we track references
1237 rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
1241 free( things.things );
1242 free( (void *) things.names );
1247 Given a destination route table (drt), clone from the source (srt) into it.
1248 If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1249 allocate the drt if that was nil too). If all is true (1), then we will clone both
1250 the MT and the ME spaces; otherwise only the ME space is cloned.
1252 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1253 endpoint_t* ep; // an endpoint
1254 rtable_ent_t* rte; // a route table entry
1261 drt = uta_rt_init( ctx );
1267 drt->ephash = ctx->ephash; // all rts reference the same EP symtab
1268 rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1270 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1277 Prepares the "new" route table for populating. If the old_rtable is not nil, then
1278 we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1279 context to be referenced by the new pointer; the old pointer is set to nil.
1281 If the old table doesn't exist, then a new table is created and the new pointer is
1282 set to reference it.
1284 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1292 if( (rt = ctx->old_rtable) != NULL ) {
1293 ctx->old_rtable = NULL;
1294 while( rt->ref_count > 0 ) { // wait for all who are using to stop
1295 if( counter++ > 1000 ) {
1296 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
1300 usleep( 1000 ); // small sleep to yield the processer if that is needed
1303 rmr_sym_clear( rt ); // clear all entries from the old table
1308 rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
1309 rt->ref_count = 0; // take no chances; ensure it's 0!
1316 Given a name, find the endpoint struct in the provided route table.
1318 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1320 if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1324 return rmr_sym_get( rt->ephash, ep_name, 1 );
1328 Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1329 Does NOT destroy the gate as it's a common gate for ALL route tables.
1331 static void uta_rt_drop( route_table_t* rt ) {
1336 rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // free each rte referenced by the hash, but NOT the endpoints
1337 rmr_sym_free( rt->hash ); // free all of the hash related data
1342 Look up and return the pointer to the endpoint stuct matching the given name.
1343 If not in the hash, a new endpoint is created, added to the hash. Should always
1346 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1349 if( !rt || !ep_name || ! *ep_name ) {
1350 rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1355 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
1356 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1357 rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
1362 ep->notify = 1; // show notification on first connection failure
1363 ep->open = 0; // not connected
1364 ep->addr = uta_h2ip( ep_name );
1365 ep->name = strdup( ep_name );
1366 pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
1367 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1369 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1377 Given a session id and message type build a key that can be used to look up the rte in the route
1378 table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1380 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1383 if( sub_id == UNSET_SUBID ) {
1384 key = 0xffffffff00000000 | mtype;
1386 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1393 Given a route table and meid string, find the owner (if known). Returns a pointer to
1394 the endpoint struct or nil.
1396 static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
1397 endpoint_t const* ep; // the ep we found in the hash
1399 if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1403 return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1407 This returns a pointer to the currently active route table and ups
1408 the reference count so that the route table is not freed while it
1409 is being used. The caller MUST call release_rt() when finished
1412 Care must be taken: the ctx->rtable pointer _could_ change during the time
1413 between the release of the lock and the return. Therefore we MUST grab
1414 the current pointer when we have the lock so that if it does we don't
1415 return a pointer to the wrong table.
1417 This will return NULL if there is no active table.
1419 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1420 route_table_t* rrt; // return value
1422 if( ctx == NULL || ctx->rtable == NULL ) {
1426 pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
1427 rrt = ctx->rtable; // must stash the pointer while we hold lock
1429 pthread_mutex_unlock( ctx->rtgate );
1431 return rrt; // pointer we upped the count with
1435 This will "release" the route table by reducing the use counter
1436 in the table. The table may not be freed until the counter reaches
1437 0, so it's imparative that the pointer be "released" when it is
1438 fetched by get_rt(). Once the caller has released the table it
1439 may not safely use the pointer that it had.
1441 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1442 if( ctx == NULL || rt == NULL ) {
1446 pthread_mutex_lock( ctx->rtgate ); // must hold lock
1447 if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
1450 pthread_mutex_unlock( ctx->rtgate );