56b124ec8d276aec03f1de359f13cb490b4a8a69
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1  // :vi sw=4 ts=4 noet2
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49 #include <immintrin.h>
50 #include <stdbool.h>
51
52 #include <RIC_message_types.h>          // needed for route manager messages
53
54 #define ALL 1
55 #define SOME 0
56
57 /*
58         Passed to a symtab foreach callback to construct a list of pointers from
59         a current symtab.
60 */
61 typedef struct thing_list {
62         int error;                                      // if a realloc failed, this will be set
63         int nalloc;
64         int nused;
65         void** things;
66         const char** names;
67 } thing_list_t;
68
69 // ---- debugging/testing -------------------------------------------------------------------------
70
71 /*
72         Dump some stats for an endpoint in the RT. This is generally called to
73         verify endpoints after a table load/change.
74
75         This is called by the for-each mechanism of the symtab and the prototype is
76         fixe; we don't really use some of the parms, but have dummy references to
77         keep sonar from complaining.
78 */
79 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
80         int*    counter;
81         endpoint_t* ep;
82
83         if( (ep = (endpoint_t *) thing) == NULL ) {
84                 return;
85         }
86
87         if( (counter = (int *) vcounter) != NULL ) {
88                 (*counter)++;
89         } else {
90                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
91                 return;
92         }
93
94         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
95 }
96
97 /*
98         Called to count meid entries in the table. The meid points to an 'owning' endpoint
99         so we can list what we find
100
101         See note in ep_stats about dummy refs.
102 */
103 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
104         int*    counter;
105         endpoint_t* ep;
106
107         if( (ep = (endpoint_t *) thing) == NULL ) {
108                 return;
109         }
110
111         if( (counter = (int *) vcounter) != NULL ) {
112                 (*counter)++;
113         } else {
114                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
115         }
116
117         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
118 }
119
120 /*
121         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
122         the 'source' information and is added to each message.
123
124         See note above about dummy references.
125 */
126 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
127         endpoint_t* ep;
128         char*   id;
129
130         if( (ep = (endpoint_t *) thing) == NULL ) {
131                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
132                 return;
133         }
134
135         if( (id = (char *) vid) == NULL ) {
136                 id = "missing";
137         }
138
139         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
140                 (long long) time( NULL ),
141                 id,
142                 ep->name,
143                 ep->open,
144                 ep->scounts[EPSC_GOOD],
145                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
146                 ep->scounts[EPSC_FAIL],
147                 ep->scounts[EPSC_TRANS]   );
148 }
149
150 /*
151         Dump stats for a route entry in the table.
152 */
153 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
154         int*    counter;
155         rtable_ent_t const* rte;                // thing is really an rte
156         int             mtype;
157         int             sid;
158
159         if( (rte = (rtable_ent_t *) thing) == NULL ) {
160                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
161                 return;
162         }
163
164         if( (counter = (int *) vcounter) != NULL ) {
165                 (*counter)++;
166         }
167
168         mtype = rte->key & 0xffff;
169         sid = (int) (rte->key >> 32);
170
171         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
172 }
173
174 /*
175         Given a route table, cause some stats to be spit out.
176 */
177 static void  rt_stats( route_table_t* rt ) {
178         int* counter;
179
180         if( rt == NULL ) {
181                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
182                 return;
183         }
184
185         counter = (int *) malloc( sizeof( int ) );
186         *counter = 0;
187         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
188         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
189         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
190         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
191
192         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
193         *counter = 0;
194         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
195         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
196
197         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
198         *counter = 0;
199         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
200         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
201
202         free( counter );
203 }
204
205 /*
206         Given a route table, cause endpoint counters to be written to stderr. The id
207         parm is written as the "source" in the output.
208 */
209 static void  rt_epcounts( route_table_t* rt, char* id ) {
210         if( rt == NULL ) {
211                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
212                 return;
213         }
214
215         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
216 }
217
218
219 static void dump_tables( uta_ctx_t *ctx ) {
220         if( ctx->old_rtable != NULL ) {
221                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
222                 rt_stats( ctx->old_rtable );
223         } else {
224                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
225         }
226         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
227         rt_stats( ctx->rtable );
228 }
229
230 // ------------ route manager communication -------------------------------------------------
231 /*
232         Send a request for a table update to the route manager. Updates come in
233         async, so send and go.
234
235         pctx is the private context for the thread; ctx is the application context
236         that we need to be able to send the application ID in case rt mgr needs to
237         use it to idenfity us.
238
239         Returns 0 if we were not able to send a request.
240 */
241 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
242         rmr_mbuf_t*     smsg;
243         int     state = 0;
244
245         if( ctx->rtg_whid < 0 ) {
246                 return state;
247         }
248
249         smsg = rmr_alloc_msg( pctx, 1024 );
250         if( smsg != NULL ) {
251                 smsg->mtype = RMRRM_REQ_TABLE;
252                 smsg->sub_id = 0;
253                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
254                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
255                 smsg->len = strlen( smsg->payload ) + 1;
256
257                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
258                 if( (state = smsg->state) != RMR_OK ) {
259                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
260                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
261                         ctx->rtg_whid = -1;
262                 }
263
264                 rmr_free_msg( smsg );
265         }
266
267         return state;
268 }
269
270 /*
271         Send an ack to the route table manager for a table ID that we are
272         processing.      State is 1 for OK, and 0 for failed. Reason might
273         be populated if we know why there was a failure.
274
275         Context should be the PRIVATE context that we use for messages
276         to route manger and NOT the user's context.
277
278         If a message buffere is passed we use that and use return to sender
279         assuming that this might be a response to a call and that is needed
280         to send back to the proper calling thread. If msg is nil, we allocate
281         and use it.
282 */
283 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
284         int             use_rts = 1;
285         int             payload_size = 1024;
286
287         if( ctx == NULL || ctx->rtg_whid < 0 ) {
288                 return;
289         }
290
291         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
292                 return;
293         }
294
295         if( smsg != NULL ) {
296                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
297         } else {
298                 use_rts = 0;
299                 smsg = rmr_alloc_msg( ctx, payload_size );
300         }
301
302         if( smsg != NULL ) {
303                 smsg->mtype = RMRRM_TABLE_STATE;
304                 smsg->sub_id = -1;
305                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
306                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
307
308                 smsg->len = strlen( smsg->payload ) + 1;
309
310                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
311                 if( use_rts ) {
312                         smsg = rmr_rts_msg( ctx, smsg );
313                 } else {
314                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
315                 }
316                 if( (state = smsg->state) != RMR_OK ) {
317                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
318                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
319                         ctx->rtg_whid = -1;
320                 }
321
322                 if( ! use_rts ) {
323                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
324                 }
325         }
326 }
327
328 // ---- alarm generation --------------------------------------------------------------------------
329
330 /*
331         Given the user's context (not the thread private context) look to see if the application isn't
332         working fast enough and we're dropping messages. If the drop counter has changed since the last
333         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
334         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
335
336         The private context is what we use to send so as not to interfere with the user flow.
337 */
338 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
339         static  int alarm_raised = 0;
340         static  int ok2clear = 0;                                       // time that we can clear
341         static  int lastd = 0;                                          // the last counter value so we can compute delta
342         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
343
344         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
345         if( ! alarm_raised ) {
346                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
347                         return;
348                 }
349
350                 alarm_raised = 1;
351                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
352                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
353         } else {
354                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
355                         lastd = uctx->dcount;
356                         ok2clear = 0;                                                   // reset the timer
357                         return;
358                 }
359
360                 if( ok2clear == 0 ) {                                           // first round where not dropping
361                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
362                 } else {
363                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
364                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
365                                 alarm_raised = 0;
366                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
367                                 prob_id++;
368                         }
369                 }
370         }
371 }
372
373 // ---- utility -----------------------------------------------------------------------------------
374 /*
375         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
376         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
377 */
378 static char* clip( char* buf ) {
379         char*   tok=NULL;
380
381         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
382                 buf++;
383         }
384
385         if( (tok = strchr( buf, '#' )) != NULL ) {
386                 if( tok == buf ) {
387                         return buf;                                     // just push back; leading comment sym handled there
388                 }
389
390                 if( isspace( *(tok-1) ) ) {
391                         *tok = 0;
392                 }
393         }
394
395         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
396         *(tok+1) = 0;
397
398         return buf;
399 }
400
401 /*
402         This accepts a pointer to a nil terminated string, and ensures that there is a
403         newline as the last character. If there is not, a new buffer is allocated and
404         the newline is added.  If a new buffer is allocated, the buffer passed in is
405         freed.  The function returns a pointer which the caller should use, and must
406         free.  In the event of an error, a nil pointer is returned.
407 */
408 static char* ensure_nlterm( char* buf ) {
409         char*   nb = NULL;
410         int             len = 0;
411
412         if( buf != NULL ) {
413                 len = strlen( buf );
414         }
415
416         nb = buf;                                                       // default to returning original as is
417         switch( len ) {
418                 case 0:
419                         nb = strdup( "\n" );
420                         break;
421
422                 case 1:
423                         if( *buf != '\n' ) {            // not a newline; realloc
424                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
425                                 nb = strdup( " \n" );
426                                 *nb = *buf;
427                                 free( buf );
428                         }
429                         break;
430
431                 default:
432                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
433                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
434                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
435                                         memcpy( nb, buf, len );
436                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
437                                         *(nb+len+1) = 0;
438                                         free( buf );
439                                 }
440                         }
441                         break;
442         }
443
444         return nb;
445 }
446
447 /*
448         Roll the new table into the active and the active into the old table. We
449         must have the lock on the active table to do this. It's possible that there
450         is no active table (first load), so we have to account for that (no locking).
451 */
452 static void roll_tables( uta_ctx_t* ctx ) {
453
454         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
455                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
456                 ctx->old_rtable = ctx->new_rtable;
457                 ctx->new_rtable = NULL;
458                 return;
459         }
460
461         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
462                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
463                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
464                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
465                 pthread_mutex_unlock( ctx->rtgate );
466         } else {
467                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
468                 ctx->rtable = ctx->new_rtable;                          // make new the active one
469         }
470
471         ctx->new_rtable = NULL;
472 }
473
474 /*
475         Given a thing list, extend the array of pointers by 1/2 of the current
476         number allocated. If we cannot realloc an array, then we set the error
477         flag.  Unlikely, but will prevent a crash, AND will prevent us from
478         trying to use the table since we couldn't capture everything.
479 */
480 static void extend_things( thing_list_t* tl ) {
481         int old_alloc;
482         void*   old_things;
483         void*   old_names;
484
485         if( tl == NULL ) {
486                 return;
487         }
488
489         old_alloc = tl->nalloc;                         // capture current things
490         old_things = tl->things;
491         old_names = tl->names;
492
493         tl->nalloc += tl->nalloc/2;                     // new allocation size
494
495         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
496         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
497
498         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
499                 tl->error = 1;
500                 return;
501         }
502
503         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
504         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
505
506         free( old_things );
507         free( old_names );
508 }
509
510 // ------------ entry update functions ---------------------------------------------------------------
511 /*
512         Given a message type create a route table entry and add to the hash keyed on the
513         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
514         is the number of group slots to allocate in the entry.
515 */
516 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
517         rtable_ent_t* rte;
518         rtable_ent_t* old_rte;          // entry which was already in the table for the key
519
520         if( rt == NULL ) {
521                 return NULL;
522         }
523
524         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
525                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
526                 return NULL;
527         }
528         memset( rte, 0, sizeof( *rte ) );
529         rte->refs = 1;
530         rte->key = key;
531
532         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
533                 nrrgroups = 10;
534         }
535
536         if( nrrgroups ) {
537                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
538                         free( rte );
539                         return NULL;
540                 }
541                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
542         } else {
543                 rte->rrgroups = NULL;
544         }
545
546         rte->nrrgroups = nrrgroups;
547
548         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
549                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
550         }
551
552         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
553
554         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
555         return rte;
556 }
557
558 /*
559         This accepts partially parsed information from an rte or mse record sent by route manager or read from
560         a file such that:
561                 ts_field is the msg-type,sender field
562                 subid is the integer subscription id
563                 rr_field is the endpoint information for round robening message over
564
565         If all goes well, this will add an RTE to the table under construction.
566
567         The ts_field is checked to see if we should ingest this record. We ingest if one of
568         these is true:
569                 there is no sender info (a generic entry for all)
570                 there is sender and our host:port matches one of the senders
571                 the sender info is an IP address that matches one of our IP addresses
572 */
573 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
574         rtable_ent_t*   rte;            // route table entry added
575         char const*     tok;
576         int             ntoks;
577         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
578         char*   tokens[128];
579         char*   gtokens[64];
580         int             ngtoks;                         // number of tokens in the group list
581         int             grp;                            // index into group list
582         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
583         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
584
585         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
586         rr_field = clip( rr_field );
587
588         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
589                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
590                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
591
592                 key = build_rt_key( subid, atoi( ts_field ) );
593
594                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
595
596                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
597                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
598                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
599                         }
600                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
601                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
602
603                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
604                                 int             i;                                      // avoid sonar grumbling by defining this here
605
606                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
607                                         for( i = 0; i < ntoks; i++ ) {
608                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
609                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
610                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
611                                                         has_ep = TRUE;
612                                                 }
613                                         }
614                                         if( has_ep ) {
615                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
616                                                 has_ep = FALSE;
617                                         }
618                                 }
619                         }
620                 }
621         } else {
622                 if( DEBUG || (vlevel > 2) ) {
623                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
624                 }
625         }
626 }
627
628 /*
629         Trash_entry takes a partially parsed record from the input and
630         will delete the entry if the sender,mtype matches us or it's a
631         generic mtype. The refernce in the new table is removed and the
632         refcounter for the actual rte is decreased. If that ref count is
633         0 then the memory is freed (handled byh the del_rte call).
634 */
635 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
636         rtable_ent_t*   rte;            // route table entry to be 'deleted'
637         char const*     tok;
638         int             ntoks;
639         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
640         char*   tokens[128];
641
642         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
643                 return;
644         }
645
646         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
647
648         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
649                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
650                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
651
652                 key = build_rt_key( subid, atoi( ts_field ) );
653                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
654                 if( rte != NULL ) {
655                         if( DEBUG || (vlevel > 1) ) {
656                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
657                         }
658                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
659                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
660                 } else {
661                         if( DEBUG || (vlevel > 1) ) {
662                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
663                         }
664                 }
665         } else {
666                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
667         }
668 }
669
670 // -------------------------- parse functions --------------------------------------------------
671
672 /*
673         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
674         the 'owner' which should be the dns name or IP address of an enpoint
675         the meid_list is a space separated list of me IDs
676
677         This function assumes the caller has vetted the pointers as needed.
678
679         For each meid in the list, an entry is pushed into the hash which references the owner
680         endpoint such that when the meid is used to route a message it references the endpoint
681         to send messages to.
682 */
683 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
684         char const*     tok;
685         int             ntoks;
686         char*   tokens[128];
687         int             i;
688         int             state;
689         endpoint_t*     ep;                                             // endpoint struct for the owner
690
691         owner = clip( owner );                          // ditch extra whitespace and trailing comments
692         meid_list = clip( meid_list );
693
694         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
695         for( i = 0; i < ntoks; i++ ) {
696                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
697                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
698                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
699                 } else {
700                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
701                 }
702         }
703 }
704
705 /*
706         Given the tokens from an mme_del, delete the listed meid entries from the new
707         table. The list is a space separated list of meids.
708
709         The meids in the hash reference endpoints which are never deleted and so
710         the only thing that we need to do here is to remove the meid from the hash.
711
712         This function assumes the caller has vetted the pointers as needed.
713 */
714 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
715         char const*     tok;
716         int             ntoks;
717         char*   tokens[128];
718         int             i;
719
720         if( rtab->hash == NULL ) {
721                 return;
722         }
723
724         meid_list = clip( meid_list );
725
726         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
727         for( i = 0; i < ntoks; i++ ) {
728                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
729                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
730         }
731 }
732
733 /*
734         Parse a partially parsed meid record. Tokens[0] should be one of:
735                 meid_map, mme_ar, mme_del.
736
737         pctx is the private context needed to return an ack/nack using the provided
738         message buffer with the route managers address info.
739 */
740 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
741         char wbuf[1024];
742
743         if( tokens == NULL || ntoks < 1 ) {
744                 return;                                                 // silent but should never happen
745         }
746
747         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
748                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
749                 return;
750         }
751
752         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
753                 tokens[1] = clip( tokens[1] );
754                 if( *(tokens[1]) == 's' ) {
755                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
756                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
757                                 uta_rt_drop( ctx->new_rtable );
758                                 ctx->new_rtable = NULL;
759                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
760                         }
761
762                         if( ctx->table_id != NULL ) {
763                                 free( ctx->table_id );
764                         }
765                         if( ntoks > 2 ) {
766                                 ctx->table_id = strdup( clip( tokens[2] ) );
767                         } else {
768                                 ctx->table_id = NULL;
769                         }
770
771                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
772                         ctx->new_rtable->mupdates = 0;
773
774                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
775                 } else {
776                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
777                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
778                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
779                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
780                                                                 ctx->new_rtable->mupdates, tokens[2] );
781                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
782                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
783                                                 uta_rt_drop( ctx->new_rtable );
784                                                 ctx->new_rtable = NULL;
785                                                 return;
786                                         }
787
788                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
789                                 }
790
791                                 if( ctx->new_rtable ) {
792                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
793                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
794                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
795
796                                         if( vlevel > 0 ) {
797                                                 if( ctx->old_rtable != NULL ) {
798                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
799                                                         rt_stats( ctx->old_rtable );
800                                                 } else {
801                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
802                                                 }
803                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
804                                                 rt_stats( ctx->rtable );
805                                         }
806                                 } else {
807                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
808                                         ctx->new_rtable = NULL;
809                                 }
810                         }
811                 }
812
813                 return;
814         }
815
816         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
817                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
818                 return;
819         }
820
821         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
822                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
823                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
824                         return;
825                 }
826                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
827                 ctx->new_rtable->mupdates++;
828                 return;
829         }
830
831         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
832                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
833                 ctx->new_rtable->mupdates++;
834                 return;
835         }
836 }
837
838 /*
839         This will close the current table snarf file (in *.inc) and open a new one.
840         The curent one is renamed. The final file name is determined by the setting of
841         RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
842         an additional extension of .snarf.  If neither seed or snarf environment vars are
843         set then this does nothing.
844
845         If this is called before the tmp snarf file is opened, then this just opens the file.
846 */
847 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
848         static int              ok2warn = 0;    // some warnings squelched on first call
849
850         char*   seed_fname;                             // the filename from env
851         char    tfname[512];                    // temp fname
852         char    wfname[512];                    // working buffer for filename
853         char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
854
855         if( ctx == NULL ) {
856                 return;
857         }
858
859         if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
860                 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
861                         memset( wfname, 0, sizeof( wfname ) );
862                         snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
863                         snarf_fname = wfname;
864                 }
865         }
866
867         if( snarf_fname == NULL ) {
868                 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
869                 return;
870         }
871
872         memset( tfname, 0, sizeof( tfname ) );
873         snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
874
875         if( ctx->snarf_rt_fd >= 0 ) {
876                 char* msg= "### captured from route manager\n";
877                 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
878                 if( close( ctx->snarf_rt_fd ) < 0 ) {
879                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
880                         return;
881                 }
882
883                 if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
884                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
885                 }
886
887                 if( rename( tfname, snarf_fname ) ) {
888                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
889                 } else {
890                         rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
891                 }
892         }
893         ok2warn = 1;
894
895         ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
896         if( ctx->snarf_rt_fd < 0 ) {
897                 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
898         } else {
899                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
900         }
901 }
902
903 /*
904         Parse a single record recevied from the route table generator, or read
905         from a static route table file.  Start records cause a new table to
906         be started (if a partial table was received it is discarded. Table
907         entry records are added to the currenly 'in progress' table, and an
908         end record causes the in progress table to be finalised and the
909         currently active table is replaced.
910
911         The updated table will be activated when the *|end record is encountered.
912         However, to allow for a "double" update, where both the meid map and the
913         route table must be updated at the same time, the end indication on a
914         route table (new or update) may specifiy "hold" which indicates that meid
915         map entries are to follow and the updated route table should be held as
916         pending until the end of the meid map is received and validated.
917
918         CAUTION:  we are assuming that there is a single route/meid map generator
919                 and as such only one type of update is received at a time; in other
920                 words, the sender cannot mix update records and if there is more than
921                 one sender process they must synchronise to avoid issues.
922
923
924         For a RT update, we expect:
925                 newrt | start | <table-id>
926                 newrt | end | <count>
927                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
928                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
929                 mse| <mtype>[,sender] | <sub-id> | %meid
930
931
932         For a meid map update we expect:
933                 meid_map | start | <table-id>
934                 meid_map | end | <count> | <md5-hash>
935                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
936                 mme_del | <meid0> <meid1>...<meidn>
937
938
939         The pctx is our private context that must be used to send acks/status
940         messages back to the route manager.  The regular ctx is the ctx that
941         the user has been given and thus that's where we have to hang the route
942         table we're working with.
943
944         If mbuf is given, and we need to ack, then we ack using the mbuf and a
945         return to sender call (allows route manager to use wh_call() to send
946         an update and rts is required to get that back to the right thread).
947         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
948         will be used.
949 */
950 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
951         int i;
952         int ntoks;                                                      // number of tokens found in something
953         int ngtoks;
954         int     grp;                                                    // group number
955         rtable_ent_t const*     rte;                    // route table entry added
956         char*   tokens[128];
957         char*   tok=NULL;                                               // pointer into a token or string
958         char    wbuf[1024];
959
960         if( ! buf ) {
961                 return;
962         }
963
964         if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
965                 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
966                 write( ctx->snarf_rt_fd, "\n", 1 );
967         }
968
969         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
970                 buf++;
971         }
972         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
973         *(tok+1) = 0;
974
975         memset( tokens, 0, sizeof( tokens ) );
976         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
977                 tokens[0] = clip( tokens[0] );
978                 switch( *(tokens[0]) ) {
979                         case 0:                                                                                                 // ignore blanks
980                                 // fallthrough
981                         case '#':                                                                                               // and comment lines
982                                 break;
983
984                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
985                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
986                                         break;
987                                 }
988
989                                 if( ntoks < 3 ) {
990                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
991                                         break;
992                                 }
993
994                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
995                                 ctx->new_rtable->updates++;
996                                 break;
997
998                         case 'n':                                                                                               // newrt|{start|end}
999                                 tokens[1] = clip( tokens[1] );
1000                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1001                                         if( ctx && ctx->snarf_rt_fd >= 0 ) {
1002                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1003                                         }
1004
1005                                         if( ntoks >2 ) {
1006                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1007                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1008                                                                 ctx->new_rtable->updates, tokens[2] );
1009                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1010                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1011                                                         uta_rt_drop( ctx->new_rtable );
1012                                                         ctx->new_rtable = NULL;
1013                                                         break;
1014                                                 }
1015                                         }
1016
1017                                         if( ctx->new_rtable ) {
1018                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1019                                                 if( DEBUG > 1 || (vlevel > 1) ) {
1020                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1021                                                         dump_tables( ctx );
1022                                                 }
1023
1024                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1025                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1026                                                 ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
1027                                         } else {
1028                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1029                                                 ctx->new_rtable = NULL;
1030                                         }
1031                                 } else {                                                                                                                        // start a new table.
1032                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
1033                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1034
1035                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1036                                                 uta_rt_drop( ctx->new_rtable );
1037                                                 ctx->new_rtable = NULL;
1038                                         }
1039
1040                                         if( ctx->table_id != NULL ) {
1041                                                 free( ctx->table_id );
1042                                         }
1043                                         if( ntoks >2 ) {
1044                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1045                                         } else {
1046                                                 ctx->table_id = NULL;
1047                                         }
1048
1049                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
1050                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
1051
1052                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1053                                 }
1054                                 break;
1055
1056                         case 'm':                                                                       // mse entry or one of the meid_ records
1057                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
1058                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1059                                                 break;
1060                                         }
1061
1062                                         if( ntoks < 4 ) {
1063                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1064                                                 break;
1065                                         }
1066
1067                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1068                                         ctx->new_rtable->updates++;
1069                                 } else {
1070                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1071                                 }
1072                                 break;
1073
1074                         case 'r':                                       // assume rt entry
1075                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1076                                         break;
1077                                 }
1078
1079                                 ctx->new_rtable->updates++;
1080                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1081                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1082                                 } else {
1083                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1084                                 }
1085                                 break;
1086
1087                         case 'u':                                                                                               // update current table, not a total replacement
1088                                 if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
1089                                         rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1090                                         break;
1091                                 }
1092
1093                                 tokens[1] = clip( tokens[1] );
1094                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1095                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1096                                                 break;
1097                                         }
1098                                         if( ctx->snarf_rt_fd >= 0 ) {
1099                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1100                                         }
1101
1102                                         if( ntoks >2 ) {
1103                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1104                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1105                                                                 ctx->new_rtable->updates, tokens[2] );
1106                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1107                                                         uta_rt_drop( ctx->new_rtable );
1108                                                         ctx->new_rtable = NULL;
1109                                                         break;
1110                                                 }
1111                                         }
1112
1113                                         if( ctx->new_rtable ) {
1114                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1115                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1116                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1117                                                         dump_tables( ctx );
1118                                                 }
1119
1120                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1121                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1122                                         } else {
1123                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1124                                                 ctx->new_rtable = NULL;
1125                                         }
1126                                 } else {                                                                                        // start a new table.
1127                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1128                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1129                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1130                                                 uta_rt_drop( ctx->new_rtable );
1131                                                 ctx->new_rtable = NULL;
1132                                         }
1133
1134                                         if( ntoks > 2 ) {
1135                                                 if( ctx->table_id != NULL ) {
1136                                                         free( ctx->table_id );
1137                                                 }
1138                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1139                                         }
1140
1141                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1142                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1143
1144                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1145                                 }
1146                                 break;
1147
1148                         default:
1149                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1150                                 break;
1151                 }
1152         }
1153 }
1154
1155 /*
1156         This function attempts to open a static route table in order to create a 'seed'
1157         table during initialisation.  The environment variable RMR_SEED_RT is expected
1158         to contain the necessary path to the file. If missing, or if the file is empty,
1159         no route table will be available until one is received from the generator.
1160
1161         This function is probably most useful for testing situations, or extreme
1162         cases where the routes are static.
1163 */
1164 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1165         int             i;
1166         char*   fname;
1167         char*   fbuf;                           // buffer with file contents
1168         char*   rec;                            // start of the record
1169         char*   eor;                            // end of the record
1170         int             rcount = 0;                     // record count for debug
1171
1172         if( (fname = ctx->seed_rt_fname) == NULL ) {
1173                 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1174                         return;
1175                 }
1176
1177                 ctx->seed_rt_fname = strdup( fname );
1178                 fname = ctx->seed_rt_fname;
1179         }
1180
1181         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1182                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1183                 return;
1184         }
1185
1186         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1187         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1188                 if( *eor == '\r' ) {
1189                         *eor = '\n';                                                            // will look like a blank line which is ok
1190                 }
1191         }
1192
1193         rec = fbuf;
1194         while( rec && *rec ) {
1195                 rcount++;
1196                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1197                         *eor = 0;
1198                 } else {
1199                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1200                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1201                         free( fbuf );
1202                         return;
1203                 }
1204
1205                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1206
1207                 rec = eor+1;
1208         }
1209
1210         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1211         free( fbuf );
1212 }
1213
1214 /*
1215         Callback driven for each thing in a symtab. We collect the pointers to those
1216         things for later use (cloning).
1217 */
1218 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1219         thing_list_t*   tl;
1220
1221         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1222                 return;
1223         }
1224
1225         if( thing == NULL ) {
1226                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1227                 return;
1228         }
1229
1230         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1231         tl->things[tl->nused++] = thing;                // save a reference to the thing
1232
1233         if( tl->nused >= tl->nalloc ) {
1234                 extend_things( tl );                            // not enough allocated
1235         }
1236 }
1237
1238 /*
1239         Called to delete a route table entry struct. We delete the array of endpoint
1240         pointers, but NOT the endpoints referenced as those are referenced from
1241         multiple entries.
1242
1243         Route table entries can be concurrently referenced by multiple symtabs, so
1244         the actual delete happens only if decrementing the rte's ref count takes it
1245         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1246         the symtab, or overlaying an entry.
1247
1248         This function uses ONLY the pointer to the rte (thing) and ignores the other
1249         information that symtab foreach function passes (st, entry, and data) which
1250         means that it _can_ safetly be used outside of the foreach setting. If
1251         the function is changed to depend on any of these three, then a stand-alone
1252         rte_cleanup() function should be added and referenced by this, and refererences
1253         to this outside of the foreach world should be changed.
1254 */
1255 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1256         rtable_ent_t*   rte;
1257         int i;
1258
1259         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1260                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1261                 return;
1262         }
1263
1264         rte->refs--;
1265         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1266                 return;
1267         }
1268
1269         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1270                 for( i = 0; i < rte->nrrgroups; i++ ) {
1271                         if( rte->rrgroups[i] ) {
1272                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1273                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1274                         }
1275
1276                 }
1277
1278                 free( rte->rrgroups );
1279         }
1280
1281         free( rte );                                                                                    // finally, drop the potato
1282 }
1283
1284 /*
1285         Read an entire file into a buffer. We assume for route table files
1286         they will be smallish and so this won't be a problem.
1287         Returns a pointer to the buffer, or nil. Caller must free.
1288         Terminates the buffer with a nil character for string processing.
1289
1290         If we cannot stat the file, we assume it's empty or missing and return
1291         an empty buffer, as opposed to a nil, so the caller can generate defaults
1292         or error if an empty/missing file isn't tolerated.
1293 */
1294 static char* uta_fib( char const* fname ) {
1295         struct stat     stats;
1296         off_t           fsize = 8192;   // size of the file
1297         off_t           nread;                  // number of bytes read
1298         int                     fd;
1299         char*           buf;                    // input buffer
1300
1301         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1302                 if( fstat( fd, &stats ) >= 0 ) {
1303                         if( stats.st_size <= 0 ) {                                      // empty file
1304                                 close( fd );
1305                                 fd = -1;
1306                         } else {
1307                                 fsize = stats.st_size;                                          // stat ok, save the file size
1308                         }
1309                 } else {
1310                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1311                 }
1312         }
1313
1314         if( fd < 0 ) {                                                                                  // didn't open or empty
1315                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1316                         return NULL;
1317                 }
1318
1319                 *buf = 0;
1320                 return buf;
1321         }
1322
1323         // add a size limit check here
1324
1325         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1326                 close( fd );
1327                 errno = ENOMEM;
1328                 return NULL;
1329         }
1330
1331         nread = read( fd, buf, fsize );
1332         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1333                 free( buf );
1334                 errno = EFBIG;                                                                                  // likely too much to handle
1335                 close( fd );
1336                 return NULL;
1337         }
1338
1339         buf[nread] = 0;
1340
1341         close( fd );
1342         return buf;
1343 }
1344
1345 // --------------------- initialisation/creation ---------------------------------------------
1346 /*
1347         Create and initialise a route table; Returns a pointer to the table struct.
1348 */
1349 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1350         route_table_t*  rt;
1351
1352         if( ctx == NULL ) {
1353                 return NULL;
1354         }
1355         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1356                 return NULL;
1357         }
1358
1359         memset( rt, 0, sizeof( *rt ) );
1360
1361         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1362                 free( rt );
1363                 return NULL;
1364         }
1365
1366         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1367         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1368         pthread_mutex_init( rt->gate, NULL );
1369
1370         return rt;
1371 }
1372
1373 /*
1374         Clones one of the spaces in the given table.
1375         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1376         Space is the space in the old table to copy. Space 0 uses an integer key and
1377         references rte structs. All other spaces use a string key and reference endpoints.
1378 */
1379 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1380         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1381         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1382         void*   sst;                    // source symtab
1383         void*   nst;                    // new symtab
1384         thing_list_t things;    // things from the space to copy
1385         int             i;
1386         int             free_on_err = 0;
1387
1388         if( ctx == NULL ) {
1389                 return NULL;
1390         }
1391         if( nrt == NULL ) {                             // make a new table if needed
1392                 free_on_err = 1;
1393                 nrt = uta_rt_init( ctx );
1394                 if( nrt == NULL ) {
1395                         return NULL;
1396                 }
1397         }
1398
1399         if( srt == NULL ) {             // source was nil, just give back the new table
1400                 return nrt;
1401         }
1402
1403         things.nalloc = 2048;
1404         things.nused = 0;
1405         things.error = 0;
1406         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1407         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1408         if( things.things == NULL || things.names == NULL ){
1409                 if( things.things != NULL ) {
1410                         free( things.things );
1411                 }
1412                 if( things.names != NULL ) {
1413                         free( things.names );
1414                 }
1415
1416                 if( free_on_err ) {
1417                         rmr_sym_free( nrt->hash );
1418                         free( nrt );
1419                         nrt = NULL;
1420                 } else {
1421                         nrt->error = 1;
1422                 }
1423
1424                 return nrt;
1425         }
1426         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1427         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1428
1429         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1430         nst = nrt->hash;
1431
1432         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1433         if( things.error ) {                            // something happened and capture failed
1434                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1435                 free( things.things );
1436                 free( things.names );
1437                 if( free_on_err ) {
1438                         rmr_sym_free( nrt->hash );
1439                         free( nrt );
1440                         nrt = NULL;
1441                 } else {
1442                         nrt->error = 1;
1443                 }
1444                 return nrt;
1445         }
1446
1447         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1448         for( i = 0; i < things.nused; i++ ) {
1449                 if( space ) {                                                                                           // string key, epoint reference
1450                         ep = (endpoint_t *) things.things[i];
1451                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1452                 } else {
1453                         rte = (rtable_ent_t *) things.things[i];
1454                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1455                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1456                 }
1457         }
1458
1459         free( things.things );
1460         free( (void *) things.names );
1461         return nrt;
1462 }
1463
1464 /*
1465         Given a destination route table (drt), clone from the source (srt) into it.
1466         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1467         allocate the drt if that was nil too). If all is true (1), then we will clone both
1468         the MT and the ME spaces; otherwise only the ME space is cloned.
1469 */
1470 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1471         endpoint_t*             ep;                             // an endpoint
1472         rtable_ent_t*   rte;                    // a route table entry
1473         int i;
1474
1475         if( ctx == NULL ) {
1476                 return NULL;
1477         }
1478         if( drt == NULL ) {
1479                 drt = uta_rt_init( ctx );
1480         }
1481         if( srt == NULL ) {
1482                 return drt;
1483         }
1484
1485         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1486         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1487         if( all ) {
1488                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1489         }
1490
1491         return drt;
1492 }
1493
1494 /*
1495         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1496         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1497         context to be referenced by the new pointer; the old pointer is set to nil.
1498
1499         If the old table doesn't exist, then a new table is created and the new pointer is
1500         set to reference it.
1501
1502         The ME namespace references endpoints which do not need to be released, therefore we
1503         do not need to run that portion of the table to deref like we do for the RTEs.
1504 */
1505 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1506         int counter = 0;
1507         int ref_count;
1508         route_table_t*  rt;
1509
1510         if( ctx == NULL ) {
1511                 return NULL;
1512         }
1513
1514         if( (rt = ctx->old_rtable) != NULL ) {
1515                 ctx->old_rtable = NULL;
1516
1517                 pthread_mutex_lock( ctx->rtgate );
1518                 ref_count = rt->ref_count;
1519                 pthread_mutex_unlock( ctx->rtgate );
1520
1521                 while( ref_count > 0 ) {                                // wait for all who are using to stop
1522                         if( counter++ > 1000 ) {
1523                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1524                                 break;
1525                         }
1526
1527                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1528
1529                         pthread_mutex_lock( ctx->rtgate );
1530                         ref_count = rt->ref_count;
1531                         pthread_mutex_unlock( ctx->rtgate );
1532                 }
1533
1534                 if( rt->hash != NULL ) {
1535                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1536                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1537                 }
1538
1539                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1540         } else {
1541                 rt = NULL;
1542         }
1543
1544         pthread_mutex_destroy(ctx->rtgate);
1545         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1546         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1547                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1548         } else {
1549                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1550                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1551                 rt->error = 1;
1552         }
1553
1554         return rt;
1555 }
1556
1557
1558 /*
1559         Given a name, find the endpoint struct in the provided route table.
1560 */
1561 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1562
1563         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1564                 return NULL;
1565         }
1566
1567         return rmr_sym_get( rt->ephash, ep_name, 1 );
1568 }
1569
1570 /*
1571         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1572         Does NOT destroy the gate as it's a common gate for ALL route tables.
1573 */
1574 static void uta_rt_drop( route_table_t* rt ) {
1575         if( rt == NULL ) {
1576                 return;
1577         }
1578
1579         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1580         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1581         free( rt );
1582 }
1583
1584 /*
1585         Look up and return the pointer to the endpoint stuct matching the given name.
1586         If not in the hash, a new endpoint is created, added to the hash. Should always
1587         return a pointer.
1588 */
1589 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1590         endpoint_t*     ep;
1591
1592         if( !rt || !ep_name || ! *ep_name ) {
1593                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1594                 errno = EINVAL;
1595                 return NULL;
1596         }
1597
1598         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1599                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1600                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1601                         errno = ENOMEM;
1602                         return NULL;
1603                 }
1604
1605                 ep->notify = 1;                                                         // show notification on first connection failure
1606                 ep->open = 0;                                                           // not connected
1607                 ep->addr = uta_h2ip( ep_name );
1608                 ep->name = strdup( ep_name );
1609                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1610                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1611
1612                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1613         }
1614
1615         return ep;
1616 }
1617
1618
1619 /*
1620         Given a session id and message type build a key that can be used to look up the rte in the route
1621         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1622 */
1623 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1624         uint64_t key;
1625
1626         if( sub_id == UNSET_SUBID ) {
1627                 key = 0xffffffff00000000 | mtype;
1628         } else {
1629                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1630         }
1631
1632         return key;
1633 }
1634
1635 /*
1636         Given a route table and meid string, find the owner (if known). Returns a pointer to
1637         the endpoint struct or nil.
1638 */
1639 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1640         endpoint_t const* ep;           // the ep we found in the hash
1641
1642         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1643                 return NULL;
1644         }
1645
1646         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1647 }
1648
1649 /*
1650         This returns a pointer to the currently active route table and ups
1651         the reference count so that the route table is not freed while it
1652         is being used. The caller MUST call release_rt() when finished
1653         with the pointer.
1654
1655         Care must be taken: the ctx->rtable pointer _could_ change during the time
1656         between the release of the lock and the return. Therefore we MUST grab
1657         the current pointer when we have the lock so that if it does we don't
1658         return a pointer to the wrong table.
1659
1660         This will return NULL if there is no active table.
1661 */
1662 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1663         route_table_t*  rrt;                    // return value
1664
1665         if( ctx == NULL || ctx->rtable == NULL ) {
1666                 return NULL;
1667         }
1668
1669         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1670         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1671         rrt->ref_count++;
1672         pthread_mutex_unlock( ctx->rtgate );
1673
1674         return rrt;                                                                             // pointer we upped the count with
1675 }
1676
1677 /*
1678         This will "release" the route table by reducing the use counter
1679         in the table. The table may not be freed until the counter reaches
1680         0, so it's imparative that the pointer be "released" when it is
1681         fetched by get_rt().  Once the caller has released the table it
1682         may not safely use the pointer that it had.
1683 */
1684 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1685         if( ctx == NULL || rt == NULL ) {
1686                 return;
1687         }
1688
1689         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1690         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1691                 rt->ref_count--;
1692         }
1693         pthread_mutex_unlock( ctx->rtgate );
1694 }
1695
1696 int isspace_with_fence(int c) {
1697         _mm_lfence();
1698         return isspace( c );
1699 }
1700
1701 #endif