0de69c83a202a1132a2b06d98c8fe92395e4ae83
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1  // :vi sw=4 ts=4 noet2
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49 #include <immintrin.h>
50 #include <stdbool.h>
51
52 #include <RIC_message_types.h>          // needed for route manager messages
53
54 #define ALL 1
55 #define SOME 0
56
57 /*
58         Passed to a symtab foreach callback to construct a list of pointers from
59         a current symtab.
60 */
61 typedef struct thing_list {
62         int error;                                      // if a realloc failed, this will be set
63         int nalloc;
64         int nused;
65         void** things;
66         const char** names;
67 } thing_list_t;
68
69 // ---- debugging/testing -------------------------------------------------------------------------
70
71 /*
72         Dump some stats for an endpoint in the RT. This is generally called to
73         verify endpoints after a table load/change.
74
75         This is called by the for-each mechanism of the symtab and the prototype is
76         fixe; we don't really use some of the parms, but have dummy references to
77         keep sonar from complaining.
78 */
79 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
80         int*    counter;
81         endpoint_t* ep;
82
83         if( (ep = (endpoint_t *) thing) == NULL ) {
84                 return;
85         }
86
87         if( (counter = (int *) vcounter) != NULL ) {
88                 (*counter)++;
89         } else {
90                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
91                 return;
92         }
93
94         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
95 }
96
97 /*
98         Called to count meid entries in the table. The meid points to an 'owning' endpoint
99         so we can list what we find
100
101         See note in ep_stats about dummy refs.
102 */
103 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
104         int*    counter;
105         endpoint_t* ep;
106
107         if( (ep = (endpoint_t *) thing) == NULL ) {
108                 return;
109         }
110
111         if( (counter = (int *) vcounter) != NULL ) {
112                 (*counter)++;
113         } else {
114                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
115         }
116
117         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
118 }
119
120 /*
121         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
122         the 'source' information and is added to each message.
123
124         See note above about dummy references.
125 */
126 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
127         endpoint_t* ep;
128         char*   id;
129
130         if( (ep = (endpoint_t *) thing) == NULL ) {
131                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
132                 return;
133         }
134
135         if( (id = (char *) vid) == NULL ) {
136                 id = "missing";
137         }
138
139         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
140                 (long long) time( NULL ),
141                 id,
142                 ep->name,
143                 ep->open,
144                 ep->scounts[EPSC_GOOD],
145                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
146                 ep->scounts[EPSC_FAIL],
147                 ep->scounts[EPSC_TRANS]   );
148 }
149
150 /*
151         Dump stats for a route entry in the table.
152 */
153 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
154         int*    counter;
155         rtable_ent_t const* rte;                // thing is really an rte
156         int             mtype;
157         int             sid;
158
159         if( (rte = (rtable_ent_t *) thing) == NULL ) {
160                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
161                 return;
162         }
163
164         if( (counter = (int *) vcounter) != NULL ) {
165                 (*counter)++;
166         }
167
168         mtype = rte->key & 0xffff;
169         sid = (int) (rte->key >> 32);
170
171         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
172 }
173
174 /*
175         Given a route table, cause some stats to be spit out.
176 */
177 static void  rt_stats( route_table_t* rt ) {
178         int* counter;
179
180         if( rt == NULL ) {
181                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
182                 return;
183         }
184
185         counter = (int *) malloc( sizeof( int ) );
186         *counter = 0;
187         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
188         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
189         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
190         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
191
192         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
193         *counter = 0;
194         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
195         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
196
197         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
198         *counter = 0;
199         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
200         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
201
202         free( counter );
203 }
204
205 /*
206         Given a route table, cause endpoint counters to be written to stderr. The id
207         parm is written as the "source" in the output.
208 */
209 static void  rt_epcounts( route_table_t* rt, char* id ) {
210         if( rt == NULL ) {
211                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
212                 return;
213         }
214
215         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
216 }
217
218
219 static void dump_tables( uta_ctx_t *ctx ) {
220         if( ctx->old_rtable != NULL ) {
221                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
222                 rt_stats( ctx->old_rtable );
223         } else {
224                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
225         }
226         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
227         rt_stats( ctx->rtable );
228 }
229
230 // ------------ route manager communication -------------------------------------------------
231 /*
232         Send a request for a table update to the route manager. Updates come in
233         async, so send and go.
234
235         pctx is the private context for the thread; ctx is the application context
236         that we need to be able to send the application ID in case rt mgr needs to
237         use it to idenfity us.
238
239         Returns 0 if we were not able to send a request.
240 */
241 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
242         rmr_mbuf_t*     smsg;
243         int     state = 0;
244
245         if( ctx->rtg_whid < 0 ) {
246                 return state;
247         }
248
249         smsg = rmr_alloc_msg( pctx, 1024 );
250         if( smsg != NULL ) {
251                 smsg->mtype = RMRRM_REQ_TABLE;
252                 smsg->sub_id = 0;
253                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
254                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
255                 smsg->len = strlen( smsg->payload ) + 1;
256
257                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
258                 if( (state = smsg->state) != RMR_OK ) {
259                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
260                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
261                         ctx->rtg_whid = -1;
262                 }
263
264                 rmr_free_msg( smsg );
265         }
266
267         return state;
268 }
269
270 /*
271         Send an ack to the route table manager for a table ID that we are
272         processing.      State is 1 for OK, and 0 for failed. Reason might
273         be populated if we know why there was a failure.
274
275         Context should be the PRIVATE context that we use for messages
276         to route manger and NOT the user's context.
277
278         If a message buffere is passed we use that and use return to sender
279         assuming that this might be a response to a call and that is needed
280         to send back to the proper calling thread. If msg is nil, we allocate
281         and use it.
282 */
283 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
284         int             use_rts = 1;
285         int             payload_size = 1024;
286
287         if( ctx == NULL || ctx->rtg_whid < 0 ) {
288                 return;
289         }
290
291         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
292                 return;
293         }
294
295         if( smsg != NULL ) {
296                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
297         } else {
298                 use_rts = 0;
299                 smsg = rmr_alloc_msg( ctx, payload_size );
300         }
301
302         if( smsg != NULL ) {
303                 smsg->mtype = RMRRM_TABLE_STATE;
304                 smsg->sub_id = -1;
305                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
306                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
307
308                 smsg->len = strlen( smsg->payload ) + 1;
309
310                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
311                 if( use_rts ) {
312                         smsg = rmr_rts_msg( ctx, smsg );
313                 } else {
314                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
315                 }
316                 if( (state = smsg->state) != RMR_OK ) {
317                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
318                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
319                         ctx->rtg_whid = -1;
320                 }
321
322                 if( ! use_rts ) {
323                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
324                 }
325         }
326 }
327
328 // ---- alarm generation --------------------------------------------------------------------------
329
330 /*
331         Given the user's context (not the thread private context) look to see if the application isn't
332         working fast enough and we're dropping messages. If the drop counter has changed since the last
333         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
334         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
335
336         The private context is what we use to send so as not to interfere with the user flow.
337 */
338 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
339         static  int alarm_raised = 0;
340         static  int ok2clear = 0;                                       // time that we can clear
341         static  int lastd = 0;                                          // the last counter value so we can compute delta
342         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
343
344         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
345         if( ! alarm_raised ) {
346                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
347                         return;
348                 }
349
350                 alarm_raised = 1;
351                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
352                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
353         } else {
354                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
355                         lastd = uctx->dcount;
356                         ok2clear = 0;                                                   // reset the timer
357                         return;
358                 }
359
360                 if( ok2clear == 0 ) {                                           // first round where not dropping
361                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
362                 } else {
363                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
364                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
365                                 alarm_raised = 0;
366                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
367                                 prob_id++;
368                         }
369                 }
370         }
371 }
372
373 // ---- utility -----------------------------------------------------------------------------------
374
375 int isspace_with_fence(int c) {
376         _mm_lfence();
377         return isspace( c );
378 }
379
380 /*
381         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
382         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
383 */
384 static char* clip( char* buf ) {
385         char*   tok=NULL;
386
387         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
388                 buf++;
389         }
390
391         if( (tok = strchr( buf, '#' )) != NULL ) {
392                 if( tok == buf ) {
393                         return buf;                                     // just push back; leading comment sym handled there
394                 }
395
396                 if( isspace( *(tok-1) ) ) {
397                         *tok = 0;
398                 }
399         }
400
401         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
402         *(tok+1) = 0;
403
404         return buf;
405 }
406
407 /*
408         This accepts a pointer to a nil terminated string, and ensures that there is a
409         newline as the last character. If there is not, a new buffer is allocated and
410         the newline is added.  If a new buffer is allocated, the buffer passed in is
411         freed.  The function returns a pointer which the caller should use, and must
412         free.  In the event of an error, a nil pointer is returned.
413 */
414 static char* ensure_nlterm( char* buf ) {
415         char*   nb = NULL;
416         int             len = 0;
417
418         if( buf != NULL ) {
419                 len = strlen( buf );
420         }
421
422         nb = buf;                                                       // default to returning original as is
423         switch( len ) {
424                 case 0:
425                         nb = strdup( "\n" );
426                         break;
427
428                 case 1:
429                         if( *buf != '\n' ) {            // not a newline; realloc
430                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
431                                 nb = strdup( " \n" );
432                                 *nb = *buf;
433                                 free( buf );
434                         }
435                         break;
436
437                 default:
438                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
439                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
440                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
441                                         memcpy( nb, buf, len );
442                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
443                                         *(nb+len+1) = 0;
444                                         free( buf );
445                                 }
446                         }
447                         break;
448         }
449
450         return nb;
451 }
452
453 /*
454         Roll the new table into the active and the active into the old table. We
455         must have the lock on the active table to do this. It's possible that there
456         is no active table (first load), so we have to account for that (no locking).
457 */
458 static void roll_tables( uta_ctx_t* ctx ) {
459
460         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
461                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
462                 ctx->old_rtable = ctx->new_rtable;
463                 ctx->new_rtable = NULL;
464                 return;
465         }
466
467         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
468                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
469                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
470                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
471                 pthread_mutex_unlock( ctx->rtgate );
472         } else {
473                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
474                 ctx->rtable = ctx->new_rtable;                          // make new the active one
475         }
476
477         ctx->new_rtable = NULL;
478 }
479
480 /*
481         Given a thing list, extend the array of pointers by 1/2 of the current
482         number allocated. If we cannot realloc an array, then we set the error
483         flag.  Unlikely, but will prevent a crash, AND will prevent us from
484         trying to use the table since we couldn't capture everything.
485 */
486 static void extend_things( thing_list_t* tl ) {
487         int old_alloc;
488         void*   old_things;
489         void*   old_names;
490
491         if( tl == NULL ) {
492                 return;
493         }
494
495         old_alloc = tl->nalloc;                         // capture current things
496         old_things = tl->things;
497         old_names = tl->names;
498
499         tl->nalloc += tl->nalloc/2;                     // new allocation size
500
501         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
502         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
503
504         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
505                 tl->error = 1;
506                 return;
507         }
508
509         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
510         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
511
512         free( old_things );
513         free( old_names );
514 }
515
516 // ------------ entry update functions ---------------------------------------------------------------
517 /*
518         Given a message type create a route table entry and add to the hash keyed on the
519         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
520         is the number of group slots to allocate in the entry.
521 */
522 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
523         rtable_ent_t* rte;
524         rtable_ent_t* old_rte;          // entry which was already in the table for the key
525
526         if( rt == NULL ) {
527                 return NULL;
528         }
529
530         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
531                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
532                 return NULL;
533         }
534         memset( rte, 0, sizeof( *rte ) );
535         rte->refs = 1;
536         rte->key = key;
537
538         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
539                 nrrgroups = 10;
540         }
541
542         if( nrrgroups ) {
543                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
544                         free( rte );
545                         return NULL;
546                 }
547                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
548         } else {
549                 rte->rrgroups = NULL;
550         }
551
552         rte->nrrgroups = nrrgroups;
553
554         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
555                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
556         }
557
558         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
559
560         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
561         return rte;
562 }
563
564 /*
565         This accepts partially parsed information from an rte or mse record sent by route manager or read from
566         a file such that:
567                 ts_field is the msg-type,sender field
568                 subid is the integer subscription id
569                 rr_field is the endpoint information for round robening message over
570
571         If all goes well, this will add an RTE to the table under construction.
572
573         The ts_field is checked to see if we should ingest this record. We ingest if one of
574         these is true:
575                 there is no sender info (a generic entry for all)
576                 there is sender and our host:port matches one of the senders
577                 the sender info is an IP address that matches one of our IP addresses
578 */
579 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
580         rtable_ent_t*   rte;            // route table entry added
581         char const*     tok;
582         int             ntoks;
583         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
584         char*   tokens[128];
585         char*   gtokens[64];
586         int             ngtoks;                         // number of tokens in the group list
587         int             grp;                            // index into group list
588         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
589         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
590
591         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
592         rr_field = clip( rr_field );
593
594         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
595                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
596                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
597
598                 key = build_rt_key( subid, atoi( ts_field ) );
599
600                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
601
602                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
603                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
604                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
605                         }
606                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
607                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
608
609                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
610                                 int             i;                                      // avoid sonar grumbling by defining this here
611
612                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
613                                         for( i = 0; i < ntoks; i++ ) {
614                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
615                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
616                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
617                                                         has_ep = TRUE;
618                                                 }
619                                         }
620                                         if( has_ep ) {
621                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
622                                                 has_ep = FALSE;
623                                         }
624                                 }
625                         }
626                 }
627         } else {
628                 if( DEBUG || (vlevel > 2) ) {
629                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
630                 }
631         }
632 }
633
634 /*
635         Trash_entry takes a partially parsed record from the input and
636         will delete the entry if the sender,mtype matches us or it's a
637         generic mtype. The refernce in the new table is removed and the
638         refcounter for the actual rte is decreased. If that ref count is
639         0 then the memory is freed (handled byh the del_rte call).
640 */
641 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
642         rtable_ent_t*   rte;            // route table entry to be 'deleted'
643         char const*     tok;
644         int             ntoks;
645         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
646         char*   tokens[128];
647
648         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
649                 return;
650         }
651
652         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
653
654         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
655                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
656                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
657
658                 key = build_rt_key( subid, atoi( ts_field ) );
659                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
660                 if( rte != NULL ) {
661                         if( DEBUG || (vlevel > 1) ) {
662                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
663                         }
664                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
665                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
666                 } else {
667                         if( DEBUG || (vlevel > 1) ) {
668                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
669                         }
670                 }
671         } else {
672                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
673         }
674 }
675
676 // -------------------------- parse functions --------------------------------------------------
677
678 /*
679         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
680         the 'owner' which should be the dns name or IP address of an enpoint
681         the meid_list is a space separated list of me IDs
682
683         This function assumes the caller has vetted the pointers as needed.
684
685         For each meid in the list, an entry is pushed into the hash which references the owner
686         endpoint such that when the meid is used to route a message it references the endpoint
687         to send messages to.
688 */
689 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
690         char const*     tok;
691         int             ntoks;
692         char*   tokens[128];
693         int             i;
694         int             state;
695         endpoint_t*     ep;                                             // endpoint struct for the owner
696
697         owner = clip( owner );                          // ditch extra whitespace and trailing comments
698         meid_list = clip( meid_list );
699
700         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
701         for( i = 0; i < ntoks; i++ ) {
702                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
703                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
704                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
705                 } else {
706                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
707                 }
708         }
709 }
710
711 /*
712         Given the tokens from an mme_del, delete the listed meid entries from the new
713         table. The list is a space separated list of meids.
714
715         The meids in the hash reference endpoints which are never deleted and so
716         the only thing that we need to do here is to remove the meid from the hash.
717
718         This function assumes the caller has vetted the pointers as needed.
719 */
720 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
721         char const*     tok;
722         int             ntoks;
723         char*   tokens[128];
724         int             i;
725
726         if( rtab->hash == NULL ) {
727                 return;
728         }
729
730         meid_list = clip( meid_list );
731
732         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
733         for( i = 0; i < ntoks; i++ ) {
734                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
735                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
736         }
737 }
738
739 /*
740         Parse a partially parsed meid record. Tokens[0] should be one of:
741                 meid_map, mme_ar, mme_del.
742
743         pctx is the private context needed to return an ack/nack using the provided
744         message buffer with the route managers address info.
745 */
746 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
747         char wbuf[1024];
748
749         if( tokens == NULL || ntoks < 1 ) {
750                 return;                                                 // silent but should never happen
751         }
752
753         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
754                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
755                 return;
756         }
757
758         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
759                 tokens[1] = clip( tokens[1] );
760                 if( *(tokens[1]) == 's' ) {
761                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
762                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
763                                 uta_rt_drop( ctx->new_rtable );
764                                 ctx->new_rtable = NULL;
765                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
766                         }
767
768                         if( ctx->table_id != NULL ) {
769                                 free( ctx->table_id );
770                         }
771                         if( ntoks > 2 ) {
772                                 ctx->table_id = strdup( clip( tokens[2] ) );
773                         } else {
774                                 ctx->table_id = NULL;
775                         }
776
777                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
778                         ctx->new_rtable->mupdates = 0;
779
780                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
781                 } else {
782                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
783                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
784                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
785                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
786                                                                 ctx->new_rtable->mupdates, tokens[2] );
787                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
788                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
789                                                 uta_rt_drop( ctx->new_rtable );
790                                                 ctx->new_rtable = NULL;
791                                                 return;
792                                         }
793
794                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
795                                 }
796
797                                 if( ctx->new_rtable ) {
798                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
799                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
800                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
801
802                                         if( vlevel > 0 ) {
803                                                 if( ctx->old_rtable != NULL ) {
804                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
805                                                         rt_stats( ctx->old_rtable );
806                                                 } else {
807                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
808                                                 }
809                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
810                                                 rt_stats( ctx->rtable );
811                                         }
812                                 } else {
813                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
814                                         ctx->new_rtable = NULL;
815                                 }
816                         }
817                 }
818
819                 return;
820         }
821
822         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
823                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
824                 return;
825         }
826
827         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
828                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
829                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
830                         return;
831                 }
832                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
833                 ctx->new_rtable->mupdates++;
834                 return;
835         }
836
837         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
838                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
839                 ctx->new_rtable->mupdates++;
840                 return;
841         }
842 }
843
844 /*
845         This will close the current table snarf file (in *.inc) and open a new one.
846         The curent one is renamed. The final file name is determined by the setting of
847         RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
848         an additional extension of .snarf.  If neither seed or snarf environment vars are
849         set then this does nothing.
850
851         If this is called before the tmp snarf file is opened, then this just opens the file.
852 */
853 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
854         static int              ok2warn = 0;    // some warnings squelched on first call
855
856         char*   seed_fname;                             // the filename from env
857         char    tfname[512];                    // temp fname
858         char    wfname[512];                    // working buffer for filename
859         char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
860
861         if( ctx == NULL ) {
862                 return;
863         }
864
865         if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
866                 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
867                         memset( wfname, 0, sizeof( wfname ) );
868                         snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
869                         snarf_fname = wfname;
870                 }
871         }
872
873         if( snarf_fname == NULL ) {
874                 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
875                 return;
876         }
877
878         memset( tfname, 0, sizeof( tfname ) );
879         snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
880
881         if( ctx->snarf_rt_fd >= 0 ) {
882                 char* msg= "### captured from route manager\n";
883                 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
884                 if( close( ctx->snarf_rt_fd ) < 0 ) {
885                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
886                         return;
887                 }
888
889                 if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
890                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
891                 }
892
893                 if( rename( tfname, snarf_fname ) ) {
894                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
895                 } else {
896                         rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
897                 }
898         }
899         ok2warn = 1;
900
901         ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
902         if( ctx->snarf_rt_fd < 0 ) {
903                 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
904         } else {
905                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
906         }
907 }
908
909 /*
910         Parse a single record recevied from the route table generator, or read
911         from a static route table file.  Start records cause a new table to
912         be started (if a partial table was received it is discarded. Table
913         entry records are added to the currenly 'in progress' table, and an
914         end record causes the in progress table to be finalised and the
915         currently active table is replaced.
916
917         The updated table will be activated when the *|end record is encountered.
918         However, to allow for a "double" update, where both the meid map and the
919         route table must be updated at the same time, the end indication on a
920         route table (new or update) may specifiy "hold" which indicates that meid
921         map entries are to follow and the updated route table should be held as
922         pending until the end of the meid map is received and validated.
923
924         CAUTION:  we are assuming that there is a single route/meid map generator
925                 and as such only one type of update is received at a time; in other
926                 words, the sender cannot mix update records and if there is more than
927                 one sender process they must synchronise to avoid issues.
928
929
930         For a RT update, we expect:
931                 newrt | start | <table-id>
932                 newrt | end | <count>
933                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
934                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
935                 mse| <mtype>[,sender] | <sub-id> | %meid
936
937
938         For a meid map update we expect:
939                 meid_map | start | <table-id>
940                 meid_map | end | <count> | <md5-hash>
941                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
942                 mme_del | <meid0> <meid1>...<meidn>
943
944
945         The pctx is our private context that must be used to send acks/status
946         messages back to the route manager.  The regular ctx is the ctx that
947         the user has been given and thus that's where we have to hang the route
948         table we're working with.
949
950         If mbuf is given, and we need to ack, then we ack using the mbuf and a
951         return to sender call (allows route manager to use wh_call() to send
952         an update and rts is required to get that back to the right thread).
953         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
954         will be used.
955 */
956 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
957         int i;
958         int ntoks;                                                      // number of tokens found in something
959         int ngtoks;
960         int     grp;                                                    // group number
961         rtable_ent_t const*     rte;                    // route table entry added
962         char*   tokens[128];
963         char*   tok=NULL;                                               // pointer into a token or string
964         char    wbuf[1024];
965
966         if( ! buf ) {
967                 return;
968         }
969
970         if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
971                 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
972                 write( ctx->snarf_rt_fd, "\n", 1 );
973         }
974
975         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
976                 buf++;
977         }
978         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
979         *(tok+1) = 0;
980
981         memset( tokens, 0, sizeof( tokens ) );
982         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
983                 tokens[0] = clip( tokens[0] );
984                 switch( *(tokens[0]) ) {
985                         case 0:                                                                                                 // ignore blanks
986                                 // fallthrough
987                         case '#':                                                                                               // and comment lines
988                                 break;
989
990                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
991                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
992                                         break;
993                                 }
994
995                                 if( ntoks < 3 ) {
996                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
997                                         break;
998                                 }
999
1000                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
1001                                 ctx->new_rtable->updates++;
1002                                 break;
1003
1004                         case 'n':                                                                                               // newrt|{start|end}
1005                                 tokens[1] = clip( tokens[1] );
1006                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1007                                         if( ctx && ctx->snarf_rt_fd >= 0 ) {
1008                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1009                                         }
1010
1011                                         if( ntoks >2 ) {
1012                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1013                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1014                                                                 ctx->new_rtable->updates, tokens[2] );
1015                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1016                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1017                                                         uta_rt_drop( ctx->new_rtable );
1018                                                         ctx->new_rtable = NULL;
1019                                                         break;
1020                                                 }
1021                                         }
1022
1023                                         if( ctx->new_rtable ) {
1024                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1025                                                 if( DEBUG > 1 || (vlevel > 1) ) {
1026                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1027                                                         dump_tables( ctx );
1028                                                 }
1029
1030                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1031                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1032                                                 ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
1033                                         } else {
1034                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1035                                                 ctx->new_rtable = NULL;
1036                                         }
1037                                 } else {                                                                                                                        // start a new table.
1038                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
1039                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1040
1041                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1042                                                 uta_rt_drop( ctx->new_rtable );
1043                                                 ctx->new_rtable = NULL;
1044                                         }
1045
1046                                         if( ctx->table_id != NULL ) {
1047                                                 free( ctx->table_id );
1048                                         }
1049                                         if( ntoks >2 ) {
1050                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1051                                         } else {
1052                                                 ctx->table_id = NULL;
1053                                         }
1054
1055                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
1056                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
1057
1058                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1059                                 }
1060                                 break;
1061
1062                         case 'm':                                                                       // mse entry or one of the meid_ records
1063                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
1064                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1065                                                 break;
1066                                         }
1067
1068                                         if( ntoks < 4 ) {
1069                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1070                                                 break;
1071                                         }
1072
1073                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1074                                         ctx->new_rtable->updates++;
1075                                 } else {
1076                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1077                                 }
1078                                 break;
1079
1080                         case 'r':                                       // assume rt entry
1081                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1082                                         break;
1083                                 }
1084
1085                                 ctx->new_rtable->updates++;
1086                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1087                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1088                                 } else {
1089                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1090                                 }
1091                                 break;
1092
1093                         case 'u':                                                                                               // update current table, not a total replacement
1094                                 if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
1095                                         rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1096                                         break;
1097                                 }
1098
1099                                 tokens[1] = clip( tokens[1] );
1100                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1101                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1102                                                 break;
1103                                         }
1104                                         if( ctx->snarf_rt_fd >= 0 ) {
1105                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1106                                         }
1107
1108                                         if( ntoks >2 ) {
1109                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1110                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1111                                                                 ctx->new_rtable->updates, tokens[2] );
1112                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1113                                                         uta_rt_drop( ctx->new_rtable );
1114                                                         ctx->new_rtable = NULL;
1115                                                         break;
1116                                                 }
1117                                         }
1118
1119                                         if( ctx->new_rtable ) {
1120                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1121                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1122                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1123                                                         dump_tables( ctx );
1124                                                 }
1125
1126                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1127                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1128                                         } else {
1129                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1130                                                 ctx->new_rtable = NULL;
1131                                         }
1132                                 } else {                                                                                        // start a new table.
1133                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1134                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1135                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1136                                                 uta_rt_drop( ctx->new_rtable );
1137                                                 ctx->new_rtable = NULL;
1138                                         }
1139
1140                                         if( ntoks > 2 ) {
1141                                                 if( ctx->table_id != NULL ) {
1142                                                         free( ctx->table_id );
1143                                                 }
1144                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1145                                         }
1146
1147                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1148                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1149
1150                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1151                                 }
1152                                 break;
1153
1154                         default:
1155                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1156                                 break;
1157                 }
1158         }
1159 }
1160
1161 /*
1162         This function attempts to open a static route table in order to create a 'seed'
1163         table during initialisation.  The environment variable RMR_SEED_RT is expected
1164         to contain the necessary path to the file. If missing, or if the file is empty,
1165         no route table will be available until one is received from the generator.
1166
1167         This function is probably most useful for testing situations, or extreme
1168         cases where the routes are static.
1169 */
1170 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1171         int             i;
1172         char*   fname;
1173         char*   fbuf;                           // buffer with file contents
1174         char*   rec;                            // start of the record
1175         char*   eor;                            // end of the record
1176         int             rcount = 0;                     // record count for debug
1177
1178         if( (fname = ctx->seed_rt_fname) == NULL ) {
1179                 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1180                         return;
1181                 }
1182
1183                 ctx->seed_rt_fname = strdup( fname );
1184                 fname = ctx->seed_rt_fname;
1185         }
1186
1187         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1188                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1189                 return;
1190         }
1191
1192         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1193         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1194                 if( *eor == '\r' ) {
1195                         *eor = '\n';                                                            // will look like a blank line which is ok
1196                 }
1197         }
1198
1199         rec = fbuf;
1200         while( rec && *rec ) {
1201                 rcount++;
1202                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1203                         *eor = 0;
1204                 } else {
1205                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1206                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1207                         free( fbuf );
1208                         return;
1209                 }
1210
1211                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1212
1213                 rec = eor+1;
1214         }
1215
1216         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1217         free( fbuf );
1218 }
1219
1220 /*
1221         Callback driven for each thing in a symtab. We collect the pointers to those
1222         things for later use (cloning).
1223 */
1224 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1225         thing_list_t*   tl;
1226
1227         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1228                 return;
1229         }
1230
1231         if( thing == NULL ) {
1232                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1233                 return;
1234         }
1235
1236         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1237         tl->things[tl->nused++] = thing;                // save a reference to the thing
1238
1239         if( tl->nused >= tl->nalloc ) {
1240                 extend_things( tl );                            // not enough allocated
1241         }
1242 }
1243
1244 /*
1245         Called to delete a route table entry struct. We delete the array of endpoint
1246         pointers, but NOT the endpoints referenced as those are referenced from
1247         multiple entries.
1248
1249         Route table entries can be concurrently referenced by multiple symtabs, so
1250         the actual delete happens only if decrementing the rte's ref count takes it
1251         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1252         the symtab, or overlaying an entry.
1253
1254         This function uses ONLY the pointer to the rte (thing) and ignores the other
1255         information that symtab foreach function passes (st, entry, and data) which
1256         means that it _can_ safetly be used outside of the foreach setting. If
1257         the function is changed to depend on any of these three, then a stand-alone
1258         rte_cleanup() function should be added and referenced by this, and refererences
1259         to this outside of the foreach world should be changed.
1260 */
1261 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1262         rtable_ent_t*   rte;
1263         int i;
1264
1265         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1266                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1267                 return;
1268         }
1269
1270         rte->refs--;
1271         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1272                 return;
1273         }
1274
1275         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1276                 for( i = 0; i < rte->nrrgroups; i++ ) {
1277                         if( rte->rrgroups[i] ) {
1278                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1279                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1280                         }
1281
1282                 }
1283
1284                 free( rte->rrgroups );
1285         }
1286
1287         free( rte );                                                                                    // finally, drop the potato
1288 }
1289
1290 /*
1291         Read an entire file into a buffer. We assume for route table files
1292         they will be smallish and so this won't be a problem.
1293         Returns a pointer to the buffer, or nil. Caller must free.
1294         Terminates the buffer with a nil character for string processing.
1295
1296         If we cannot stat the file, we assume it's empty or missing and return
1297         an empty buffer, as opposed to a nil, so the caller can generate defaults
1298         or error if an empty/missing file isn't tolerated.
1299 */
1300 static char* uta_fib( char const* fname ) {
1301         struct stat     stats;
1302         off_t           fsize = 8192;   // size of the file
1303         off_t           nread;                  // number of bytes read
1304         int                     fd;
1305         char*           buf;                    // input buffer
1306
1307         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1308                 if( fstat( fd, &stats ) >= 0 ) {
1309                         if( stats.st_size <= 0 ) {                                      // empty file
1310                                 close( fd );
1311                                 fd = -1;
1312                         } else {
1313                                 fsize = stats.st_size;                                          // stat ok, save the file size
1314                         }
1315                 } else {
1316                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1317                 }
1318         }
1319
1320         if( fd < 0 ) {                                                                                  // didn't open or empty
1321                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1322                         return NULL;
1323                 }
1324
1325                 *buf = 0;
1326                 return buf;
1327         }
1328
1329         // add a size limit check here
1330
1331         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1332                 close( fd );
1333                 errno = ENOMEM;
1334                 return NULL;
1335         }
1336
1337         nread = read( fd, buf, fsize );
1338         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1339                 free( buf );
1340                 errno = EFBIG;                                                                                  // likely too much to handle
1341                 close( fd );
1342                 return NULL;
1343         }
1344
1345         buf[nread] = 0;
1346
1347         close( fd );
1348         return buf;
1349 }
1350
1351 // --------------------- initialisation/creation ---------------------------------------------
1352 /*
1353         Create and initialise a route table; Returns a pointer to the table struct.
1354 */
1355 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1356         route_table_t*  rt;
1357
1358         if( ctx == NULL ) {
1359                 return NULL;
1360         }
1361         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1362                 return NULL;
1363         }
1364
1365         memset( rt, 0, sizeof( *rt ) );
1366
1367         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1368                 free( rt );
1369                 return NULL;
1370         }
1371
1372         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1373         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1374         pthread_mutex_init( rt->gate, NULL );
1375
1376         return rt;
1377 }
1378
1379 /*
1380         Clones one of the spaces in the given table.
1381         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1382         Space is the space in the old table to copy. Space 0 uses an integer key and
1383         references rte structs. All other spaces use a string key and reference endpoints.
1384 */
1385 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1386         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1387         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1388         void*   sst;                    // source symtab
1389         void*   nst;                    // new symtab
1390         thing_list_t things;    // things from the space to copy
1391         int             i;
1392         int             free_on_err = 0;
1393
1394         if( ctx == NULL ) {
1395                 return NULL;
1396         }
1397         if( nrt == NULL ) {                             // make a new table if needed
1398                 free_on_err = 1;
1399                 nrt = uta_rt_init( ctx );
1400                 if( nrt == NULL ) {
1401                         return NULL;
1402                 }
1403         }
1404
1405         if( srt == NULL ) {             // source was nil, just give back the new table
1406                 return nrt;
1407         }
1408
1409         things.nalloc = 2048;
1410         things.nused = 0;
1411         things.error = 0;
1412         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1413         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1414         if( things.things == NULL || things.names == NULL ){
1415                 if( things.things != NULL ) {
1416                         free( things.things );
1417                 }
1418                 if( things.names != NULL ) {
1419                         free( things.names );
1420                 }
1421
1422                 if( free_on_err ) {
1423                         rmr_sym_free( nrt->hash );
1424                         free( nrt );
1425                         nrt = NULL;
1426                 } else {
1427                         nrt->error = 1;
1428                 }
1429
1430                 return nrt;
1431         }
1432         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1433         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1434
1435         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1436         nst = nrt->hash;
1437
1438         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1439         if( things.error ) {                            // something happened and capture failed
1440                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1441                 free( things.things );
1442                 free( things.names );
1443                 if( free_on_err ) {
1444                         rmr_sym_free( nrt->hash );
1445                         free( nrt );
1446                         nrt = NULL;
1447                 } else {
1448                         nrt->error = 1;
1449                 }
1450                 return nrt;
1451         }
1452
1453         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1454         for( i = 0; i < things.nused; i++ ) {
1455                 if( space ) {                                                                                           // string key, epoint reference
1456                         ep = (endpoint_t *) things.things[i];
1457                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1458                 } else {
1459                         rte = (rtable_ent_t *) things.things[i];
1460                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1461                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1462                 }
1463         }
1464
1465         free( things.things );
1466         free( (void *) things.names );
1467         return nrt;
1468 }
1469
1470 /*
1471         Given a destination route table (drt), clone from the source (srt) into it.
1472         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1473         allocate the drt if that was nil too). If all is true (1), then we will clone both
1474         the MT and the ME spaces; otherwise only the ME space is cloned.
1475 */
1476 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1477         endpoint_t*             ep;                             // an endpoint
1478         rtable_ent_t*   rte;                    // a route table entry
1479         int i;
1480
1481         if( ctx == NULL ) {
1482                 return NULL;
1483         }
1484         if( drt == NULL ) {
1485                 drt = uta_rt_init( ctx );
1486         }
1487         if( srt == NULL ) {
1488                 return drt;
1489         }
1490
1491         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1492         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1493         if( all ) {
1494                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1495         }
1496
1497         return drt;
1498 }
1499
1500 /*
1501         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1502         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1503         context to be referenced by the new pointer; the old pointer is set to nil.
1504
1505         If the old table doesn't exist, then a new table is created and the new pointer is
1506         set to reference it.
1507
1508         The ME namespace references endpoints which do not need to be released, therefore we
1509         do not need to run that portion of the table to deref like we do for the RTEs.
1510 */
1511 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1512         int counter = 0;
1513         int ref_count;
1514         route_table_t*  rt;
1515
1516         if( ctx == NULL ) {
1517                 return NULL;
1518         }
1519
1520         if( (rt = ctx->old_rtable) != NULL ) {
1521                 ctx->old_rtable = NULL;
1522
1523                 pthread_mutex_lock( ctx->rtgate );
1524                 ref_count = rt->ref_count;
1525                 pthread_mutex_unlock( ctx->rtgate );
1526
1527                 while( ref_count > 0 ) {                                // wait for all who are using to stop
1528                         if( counter++ > 1000 ) {
1529                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1530                                 break;
1531                         }
1532
1533                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1534
1535                         pthread_mutex_lock( ctx->rtgate );
1536                         ref_count = rt->ref_count;
1537                         pthread_mutex_unlock( ctx->rtgate );
1538                 }
1539
1540                 if( rt->hash != NULL ) {
1541                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1542                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1543                 }
1544
1545                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1546         } else {
1547                 rt = NULL;
1548         }
1549
1550         pthread_mutex_destroy(ctx->rtgate);
1551         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1552         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1553                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1554         } else {
1555                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1556                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1557                 rt->error = 1;
1558         }
1559
1560         return rt;
1561 }
1562
1563
1564 /*
1565         Given a name, find the endpoint struct in the provided route table.
1566 */
1567 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1568
1569         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1570                 return NULL;
1571         }
1572
1573         return rmr_sym_get( rt->ephash, ep_name, 1 );
1574 }
1575
1576 /*
1577         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1578         Does NOT destroy the gate as it's a common gate for ALL route tables.
1579 */
1580 static void uta_rt_drop( route_table_t* rt ) {
1581         if( rt == NULL ) {
1582                 return;
1583         }
1584
1585         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1586         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1587         free( rt );
1588 }
1589
1590 /*
1591         Look up and return the pointer to the endpoint stuct matching the given name.
1592         If not in the hash, a new endpoint is created, added to the hash. Should always
1593         return a pointer.
1594 */
1595 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1596         endpoint_t*     ep;
1597
1598         if( !rt || !ep_name || ! *ep_name ) {
1599                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1600                 errno = EINVAL;
1601                 return NULL;
1602         }
1603
1604         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1605                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1606                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1607                         errno = ENOMEM;
1608                         return NULL;
1609                 }
1610
1611                 ep->notify = 1;                                                         // show notification on first connection failure
1612                 ep->open = 0;                                                           // not connected
1613                 ep->addr = uta_h2ip( ep_name );
1614                 ep->name = strdup( ep_name );
1615                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1616                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1617
1618                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1619         }
1620
1621         return ep;
1622 }
1623
1624
1625 /*
1626         Given a session id and message type build a key that can be used to look up the rte in the route
1627         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1628 */
1629 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1630         uint64_t key;
1631
1632         if( sub_id == UNSET_SUBID ) {
1633                 key = 0xffffffff00000000 | mtype;
1634         } else {
1635                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1636         }
1637
1638         return key;
1639 }
1640
1641 /*
1642         Given a route table and meid string, find the owner (if known). Returns a pointer to
1643         the endpoint struct or nil.
1644 */
1645 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1646         endpoint_t const* ep;           // the ep we found in the hash
1647
1648         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1649                 return NULL;
1650         }
1651
1652         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1653 }
1654
1655 /*
1656         This returns a pointer to the currently active route table and ups
1657         the reference count so that the route table is not freed while it
1658         is being used. The caller MUST call release_rt() when finished
1659         with the pointer.
1660
1661         Care must be taken: the ctx->rtable pointer _could_ change during the time
1662         between the release of the lock and the return. Therefore we MUST grab
1663         the current pointer when we have the lock so that if it does we don't
1664         return a pointer to the wrong table.
1665
1666         This will return NULL if there is no active table.
1667 */
1668 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1669         route_table_t*  rrt;                    // return value
1670
1671         if( ctx == NULL || ctx->rtable == NULL ) {
1672                 return NULL;
1673         }
1674
1675         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1676         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1677         rrt->ref_count++;
1678         pthread_mutex_unlock( ctx->rtgate );
1679
1680         return rrt;                                                                             // pointer we upped the count with
1681 }
1682
1683 /*
1684         This will "release" the route table by reducing the use counter
1685         in the table. The table may not be freed until the counter reaches
1686         0, so it's imparative that the pointer be "released" when it is
1687         fetched by get_rt().  Once the caller has released the table it
1688         may not safely use the pointer that it had.
1689 */
1690 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1691         if( ctx == NULL || rt == NULL ) {
1692                 return;
1693         }
1694
1695         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1696         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1697                 rt->ref_count--;
1698         }
1699         pthread_mutex_unlock( ctx->rtgate );
1700 }
1701
1702 #endif