5baf79ae28cda191ef37f43b41075834c8598333
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1  // :vi sw=4 ts=4 noet2
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49 #include <immintrin.h>
50 #include <stdbool.h>
51
52 #include <RIC_message_types.h>          // needed for route manager messages
53
54 #define ALL 1
55 #define SOME 0
56
57 /*
58         Passed to a symtab foreach callback to construct a list of pointers from
59         a current symtab.
60 */
61 typedef struct thing_list {
62         int error;                                      // if a realloc failed, this will be set
63         int nalloc;
64         int nused;
65         void** things;
66         const char** names;
67 } thing_list_t;
68
69 // ---- debugging/testing -------------------------------------------------------------------------
70
71 /*
72         Dump some stats for an endpoint in the RT. This is generally called to
73         verify endpoints after a table load/change.
74
75         This is called by the for-each mechanism of the symtab and the prototype is
76         fixe; we don't really use some of the parms, but have dummy references to
77         keep sonar from complaining.
78 */
79 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
80         int*    counter;
81         endpoint_t* ep;
82
83         if( (ep = (endpoint_t *) thing) == NULL ) {
84                 return;
85         }
86
87         if( (counter = (int *) vcounter) != NULL ) {
88                 (*counter)++;
89         } else {
90                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
91                 return;
92         }
93
94         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
95 }
96
97 /*
98         Called to count meid entries in the table. The meid points to an 'owning' endpoint
99         so we can list what we find
100
101         See note in ep_stats about dummy refs.
102 */
103 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
104         int*    counter;
105         endpoint_t* ep;
106
107         if( (ep = (endpoint_t *) thing) == NULL ) {
108                 return;
109         }
110
111         if( (counter = (int *) vcounter) != NULL ) {
112                 (*counter)++;
113         } else {
114                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
115         }
116
117         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
118 }
119
120 /*
121         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
122         the 'source' information and is added to each message.
123
124         See note above about dummy references.
125 */
126 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
127         endpoint_t* ep;
128         char*   id;
129
130         if( (ep = (endpoint_t *) thing) == NULL ) {
131                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
132                 return;
133         }
134
135         if( (id = (char *) vid) == NULL ) {
136                 id = "missing";
137         }
138
139         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
140                 (long long) time( NULL ),
141                 id,
142                 ep->name,
143                 ep->open,
144                 ep->scounts[EPSC_GOOD],
145                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
146                 ep->scounts[EPSC_FAIL],
147                 ep->scounts[EPSC_TRANS]   );
148 }
149
150 /*
151         Dump stats for a route entry in the table.
152 */
153 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
154         int*    counter;
155         rtable_ent_t const* rte;                // thing is really an rte
156         int             mtype;
157         int             sid;
158
159         if( (rte = (rtable_ent_t *) thing) == NULL ) {
160                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
161                 return;
162         }
163
164         if( (counter = (int *) vcounter) != NULL ) {
165                 (*counter)++;
166         }
167
168         mtype = rte->key & 0xffff;
169         sid = (int) (rte->key >> 32);
170
171         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
172 }
173
174 /*
175         Given a route table, cause some stats to be spit out.
176 */
177 static void  rt_stats( route_table_t* rt ) {
178         int* counter;
179
180         if( rt == NULL ) {
181                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
182                 return;
183         }
184
185         counter = (int *) malloc( sizeof( int ) );
186         *counter = 0;
187         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
188         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
189         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
190         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
191
192         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
193         *counter = 0;
194         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
195         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
196
197         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
198         *counter = 0;
199         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
200         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
201
202         free( counter );
203 }
204
205 /*
206         Given a route table, cause endpoint counters to be written to stderr. The id
207         parm is written as the "source" in the output.
208 */
209 static void  rt_epcounts( route_table_t* rt, char* id ) {
210         if( rt == NULL ) {
211                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
212                 return;
213         }
214
215         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
216 }
217
218
219 static void dump_tables( uta_ctx_t *ctx ) {
220         if( ctx->old_rtable != NULL ) {
221                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
222                 rt_stats( ctx->old_rtable );
223         } else {
224                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
225         }
226         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
227         rt_stats( ctx->rtable );
228 }
229
230 // ------------ route manager communication -------------------------------------------------
231 /*
232         Send a request for a table update to the route manager. Updates come in
233         async, so send and go.
234
235         pctx is the private context for the thread; ctx is the application context
236         that we need to be able to send the application ID in case rt mgr needs to
237         use it to idenfity us.
238
239         Returns 0 if we were not able to send a request.
240 */
241 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
242         rmr_mbuf_t*     smsg;
243         int     state = 0;
244
245         if( ctx->rtg_whid < 0 ) {
246                 return state;
247         }
248
249         smsg = rmr_alloc_msg( pctx, 1024 );
250         if( smsg != NULL ) {
251                 smsg->mtype = RMRRM_REQ_TABLE;
252                 smsg->sub_id = 0;
253                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
254                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
255                 smsg->len = strlen( smsg->payload ) + 1;
256
257                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
258                 if( (state = smsg->state) != RMR_OK ) {
259                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
260                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
261                         ctx->rtg_whid = -1;
262                 }
263
264                 rmr_free_msg( smsg );
265         }
266
267         return state;
268 }
269
270 /*
271         Send an ack to the route table manager for a table ID that we are
272         processing.      State is 1 for OK, and 0 for failed. Reason might
273         be populated if we know why there was a failure.
274
275         Context should be the PRIVATE context that we use for messages
276         to route manger and NOT the user's context.
277
278         If a message buffere is passed we use that and use return to sender
279         assuming that this might be a response to a call and that is needed
280         to send back to the proper calling thread. If msg is nil, we allocate
281         and use it.
282 */
283 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
284         int             use_rts = 1;
285         int             payload_size = 1024;
286
287         if( ctx == NULL || ctx->rtg_whid < 0 ) {
288                 return;
289         }
290
291         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
292                 return;
293         }
294
295         if( smsg != NULL ) {
296                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
297         } else {
298                 use_rts = 0;
299                 smsg = rmr_alloc_msg( ctx, payload_size );
300         }
301
302         if( smsg != NULL ) {
303                 smsg->mtype = RMRRM_TABLE_STATE;
304                 smsg->sub_id = -1;
305                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
306                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
307
308                 smsg->len = strlen( smsg->payload ) + 1;
309
310                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
311                 if( use_rts ) {
312                         smsg = rmr_rts_msg( ctx, smsg );
313                 } else {
314                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
315                 }
316                 if( (state = smsg->state) != RMR_OK ) {
317                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
318                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
319                         ctx->rtg_whid = -1;
320                 }
321
322                 if( ! use_rts ) {
323                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
324                 }
325         }
326 }
327
328 // ---- alarm generation --------------------------------------------------------------------------
329
330 /*
331         Given the user's context (not the thread private context) look to see if the application isn't
332         working fast enough and we're dropping messages. If the drop counter has changed since the last
333         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
334         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
335
336         The private context is what we use to send so as not to interfere with the user flow.
337 */
338 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
339         static  int alarm_raised = 0;
340         static  int ok2clear = 0;                                       // time that we can clear
341         static  int lastd = 0;                                          // the last counter value so we can compute delta
342         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
343
344         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
345         if( ! alarm_raised ) {
346                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
347                         return;
348                 }
349
350                 alarm_raised = 1;
351                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
352                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
353         } else {
354                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
355                         lastd = uctx->dcount;
356                         ok2clear = 0;                                                   // reset the timer
357                         return;
358                 }
359
360                 if( ok2clear == 0 ) {                                           // first round where not dropping
361                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
362                 } else {
363                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
364                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
365                                 alarm_raised = 0;
366                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
367                                 prob_id++;
368                         }
369                 }
370         }
371 }
372
373 // ---- utility -----------------------------------------------------------------------------------
374
375 int isspace_with_fence(int c) {
376         _mm_lfence();
377         return isspace( c );
378 }
379
380 /*
381         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
382         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
383 */
384 static char* clip( char* buf ) {
385         char*   tok=NULL;
386
387         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
388                 buf++;
389         }
390
391         if( (tok = strchr( buf, '#' )) != NULL ) {
392                 if( tok == buf ) {
393                         return buf;                                     // just push back; leading comment sym handled there
394                 }
395
396                 if( isspace( *(tok-1) ) ) {
397                         *tok = 0;
398                 }
399         }
400
401         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
402         *(tok+1) = 0;
403
404         return buf;
405 }
406
407 /*
408         This accepts a pointer to a nil terminated string, and ensures that there is a
409         newline as the last character. If there is not, a new buffer is allocated and
410         the newline is added.  If a new buffer is allocated, the buffer passed in is
411         freed.  The function returns a pointer which the caller should use, and must
412         free.  In the event of an error, a nil pointer is returned.
413 */
414 static char* ensure_nlterm( char* buf ) {
415         char*   nb = NULL;
416         int             len = 0;
417
418         if( buf != NULL ) {
419                 len = strlen( buf );
420         }
421
422         nb = buf;                                                       // default to returning original as is
423         switch( len ) {
424                 case 0:
425                         nb = strdup( "\n" );
426                         break;
427
428                 case 1:
429                         if( *buf != '\n' ) {            // not a newline; realloc
430                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
431                                 nb = strdup( " \n" );
432                                 *nb = *buf;
433                                 free( buf );
434                         }
435                         break;
436
437                 default:
438                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
439                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
440                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
441                                         memcpy( nb, buf, len );
442                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
443                                         *(nb+len+1) = 0;
444                                         free( buf );
445                                 }
446                         }
447                         break;
448         }
449
450         return nb;
451 }
452
453 /*
454         Roll the new table into the active and the active into the old table. We
455         must have the lock on the active table to do this. It's possible that there
456         is no active table (first load), so we have to account for that (no locking).
457 */
458 static void roll_tables( uta_ctx_t* ctx ) {
459         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
460         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
461                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
462                 ctx->old_rtable = ctx->new_rtable;
463         }else if( ctx->rtable != NULL ) {                                                       // initially there isn't one, so must check!
464                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
465                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
466         } else {
467                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
468                 ctx->rtable = ctx->new_rtable;                          // make new the active one
469         }
470         ctx->new_rtable = NULL;
471         pthread_mutex_unlock( ctx->rtgate );
472 }
473
474 /*
475         Given a thing list, extend the array of pointers by 1/2 of the current
476         number allocated. If we cannot realloc an array, then we set the error
477         flag.  Unlikely, but will prevent a crash, AND will prevent us from
478         trying to use the table since we couldn't capture everything.
479 */
480 static void extend_things( thing_list_t* tl ) {
481         int old_alloc;
482         void*   old_things;
483         void*   old_names;
484
485         if( tl == NULL ) {
486                 return;
487         }
488
489         old_alloc = tl->nalloc;                         // capture current things
490         old_things = tl->things;
491         old_names = tl->names;
492
493         tl->nalloc += tl->nalloc/2;                     // new allocation size
494
495         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
496         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
497
498         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
499                 tl->error = 1;
500                 return;
501         }
502
503         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
504         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
505
506         free( old_things );
507         free( old_names );
508 }
509
510 // ------------ entry update functions ---------------------------------------------------------------
511 /*
512         Given a message type create a route table entry and add to the hash keyed on the
513         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
514         is the number of group slots to allocate in the entry.
515 */
516 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
517         rtable_ent_t* rte;
518         rtable_ent_t* old_rte;          // entry which was already in the table for the key
519
520         if( rt == NULL ) {
521                 return NULL;
522         }
523
524         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
525                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
526                 return NULL;
527         }
528         memset( rte, 0, sizeof( *rte ) );
529         rte->refs = 1;
530         rte->key = key;
531
532         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
533                 nrrgroups = 10;
534         }
535
536         if( nrrgroups ) {
537                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
538                         free( rte );
539                         return NULL;
540                 }
541                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
542         } else {
543                 rte->rrgroups = NULL;
544         }
545
546         rte->nrrgroups = nrrgroups;
547
548         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
549                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
550         }
551
552         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
553
554         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
555         return rte;
556 }
557
558 /*
559         This accepts partially parsed information from an rte or mse record sent by route manager or read from
560         a file such that:
561                 ts_field is the msg-type,sender field
562                 subid is the integer subscription id
563                 rr_field is the endpoint information for round robening message over
564
565         If all goes well, this will add an RTE to the table under construction.
566
567         The ts_field is checked to see if we should ingest this record. We ingest if one of
568         these is true:
569                 there is no sender info (a generic entry for all)
570                 there is sender and our host:port matches one of the senders
571                 the sender info is an IP address that matches one of our IP addresses
572 */
573 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
574         rtable_ent_t*   rte;            // route table entry added
575         char const*     tok;
576         int             ntoks;
577         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
578         char*   tokens[128];
579         char*   gtokens[64];
580         int             ngtoks;                         // number of tokens in the group list
581         int             grp;                            // index into group list
582         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
583         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
584
585         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
586         rr_field = clip( rr_field );
587
588         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
589                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
590                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
591
592                 key = build_rt_key( subid, atoi( ts_field ) );
593
594                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
595
596                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
597                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
598                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
599                         }
600                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
601                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
602
603                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
604                                 int             i;                                      // avoid sonar grumbling by defining this here
605
606                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
607                                         for( i = 0; i < ntoks; i++ ) {
608                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
609                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
610                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
611                                                         has_ep = TRUE;
612                                                 }
613                                         }
614                                         if( has_ep ) {
615                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
616                                                 has_ep = FALSE;
617                                         }
618                                 }
619                         }
620                 }
621         } else {
622                 if( DEBUG || (vlevel > 2) ) {
623                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
624                 }
625         }
626 }
627
628 /*
629         Trash_entry takes a partially parsed record from the input and
630         will delete the entry if the sender,mtype matches us or it's a
631         generic mtype. The refernce in the new table is removed and the
632         refcounter for the actual rte is decreased. If that ref count is
633         0 then the memory is freed (handled byh the del_rte call).
634 */
635 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
636         rtable_ent_t*   rte;            // route table entry to be 'deleted'
637         char const*     tok;
638         int             ntoks;
639         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
640         char*   tokens[128];
641
642         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
643                 return;
644         }
645
646         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
647
648         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
649                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
650                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
651
652                 key = build_rt_key( subid, atoi( ts_field ) );
653                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
654                 if( rte != NULL ) {
655                         if( DEBUG || (vlevel > 1) ) {
656                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
657                         }
658                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
659                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
660                 } else {
661                         if( DEBUG || (vlevel > 1) ) {
662                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
663                         }
664                 }
665         } else {
666                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
667         }
668 }
669
670 // -------------------------- parse functions --------------------------------------------------
671
672 /*
673         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
674         the 'owner' which should be the dns name or IP address of an enpoint
675         the meid_list is a space separated list of me IDs
676
677         This function assumes the caller has vetted the pointers as needed.
678
679         For each meid in the list, an entry is pushed into the hash which references the owner
680         endpoint such that when the meid is used to route a message it references the endpoint
681         to send messages to.
682 */
683 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
684         char const*     tok;
685         int             ntoks;
686         char*   tokens[128];
687         int             i;
688         int             state;
689         endpoint_t*     ep;                                             // endpoint struct for the owner
690
691         owner = clip( owner );                          // ditch extra whitespace and trailing comments
692         meid_list = clip( meid_list );
693
694         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
695         for( i = 0; i < ntoks; i++ ) {
696                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
697                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
698                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
699                 } else {
700                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
701                 }
702         }
703 }
704
705 /*
706         Given the tokens from an mme_del, delete the listed meid entries from the new
707         table. The list is a space separated list of meids.
708
709         The meids in the hash reference endpoints which are never deleted and so
710         the only thing that we need to do here is to remove the meid from the hash.
711
712         This function assumes the caller has vetted the pointers as needed.
713 */
714 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
715         char const*     tok;
716         int             ntoks;
717         char*   tokens[128];
718         int             i;
719
720         if( rtab->hash == NULL ) {
721                 return;
722         }
723
724         meid_list = clip( meid_list );
725
726         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
727         for( i = 0; i < ntoks; i++ ) {
728                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
729                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
730         }
731 }
732
733 /*
734         Parse a partially parsed meid record. Tokens[0] should be one of:
735                 meid_map, mme_ar, mme_del.
736
737         pctx is the private context needed to return an ack/nack using the provided
738         message buffer with the route managers address info.
739 */
740 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
741         char wbuf[1024];
742
743         if( tokens == NULL || ntoks < 1 ) {
744                 return;                                                 // silent but should never happen
745         }
746
747         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
748                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
749                 return;
750         }
751
752         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
753                 tokens[1] = clip( tokens[1] );
754                 if( *(tokens[1]) == 's' ) {
755                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
756                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
757                                 uta_rt_drop( ctx->new_rtable );
758                                 ctx->new_rtable = NULL;
759                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
760                         }
761
762                         if( ctx->table_id != NULL ) {
763                                 free( ctx->table_id );
764                         }
765                         if( ntoks > 2 ) {
766                                 ctx->table_id = strdup( clip( tokens[2] ) );
767                         } else {
768                                 ctx->table_id = NULL;
769                         }
770
771                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
772                         ctx->new_rtable->mupdates = 0;
773
774                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
775                 } else {
776                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
777                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
778                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
779                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
780                                                                 ctx->new_rtable->mupdates, tokens[2] );
781                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
782                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
783                                                 uta_rt_drop( ctx->new_rtable );
784                                                 ctx->new_rtable = NULL;
785                                                 return;
786                                         }
787
788                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
789                                 }
790
791                                 if( ctx->new_rtable ) {
792                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
793                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
794                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
795
796                                         if( vlevel > 0 ) {
797                                                 if( ctx->old_rtable != NULL ) {
798                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
799                                                         rt_stats( ctx->old_rtable );
800                                                 } else {
801                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
802                                                 }
803                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
804                                                 rt_stats( ctx->rtable );
805                                         }
806                                 } else {
807                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
808                                         ctx->new_rtable = NULL;
809                                 }
810                         }
811                 }
812
813                 return;
814         }
815
816         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
817                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
818                 return;
819         }
820
821         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
822                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
823                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
824                         return;
825                 }
826                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
827                 ctx->new_rtable->mupdates++;
828                 return;
829         }
830
831         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
832                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
833                 ctx->new_rtable->mupdates++;
834                 return;
835         }
836 }
837
838 /*
839         This will close the current table snarf file (in *.inc) and open a new one.
840         The curent one is renamed. The final file name is determined by the setting of
841         RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
842         an additional extension of .snarf.  If neither seed or snarf environment vars are
843         set then this does nothing.
844
845         If this is called before the tmp snarf file is opened, then this just opens the file.
846 */
847 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
848         static int              ok2warn = 0;    // some warnings squelched on first call
849
850         char*   seed_fname;                             // the filename from env
851         char    tfname[512];                    // temp fname
852         char    wfname[512];                    // working buffer for filename
853         char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
854
855         if( ctx == NULL ) {
856                 return;
857         }
858
859         if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
860                 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
861                         memset( wfname, 0, sizeof( wfname ) );
862                         snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
863                         snarf_fname = wfname;
864                 }
865         }
866
867         if( snarf_fname == NULL ) {
868                 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
869                 return;
870         }
871
872         memset( tfname, 0, sizeof( tfname ) );
873         snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
874
875         if( ctx->snarf_rt_fd >= 0 ) {
876                 char* msg= "### captured from route manager\n";
877                 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
878                 if( close( ctx->snarf_rt_fd ) < 0 ) {
879                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
880                         return;
881                 }
882
883                 if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
884                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
885                 }
886
887                 if( rename( tfname, snarf_fname ) ) {
888                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
889                 } else {
890                         rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
891                 }
892         }
893         ok2warn = 1;
894
895         ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
896         if( ctx->snarf_rt_fd < 0 ) {
897                 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
898         } else {
899                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
900         }
901 }
902
903 /*
904         Parse a single record recevied from the route table generator, or read
905         from a static route table file.  Start records cause a new table to
906         be started (if a partial table was received it is discarded. Table
907         entry records are added to the currenly 'in progress' table, and an
908         end record causes the in progress table to be finalised and the
909         currently active table is replaced.
910
911         The updated table will be activated when the *|end record is encountered.
912         However, to allow for a "double" update, where both the meid map and the
913         route table must be updated at the same time, the end indication on a
914         route table (new or update) may specifiy "hold" which indicates that meid
915         map entries are to follow and the updated route table should be held as
916         pending until the end of the meid map is received and validated.
917
918         CAUTION:  we are assuming that there is a single route/meid map generator
919                 and as such only one type of update is received at a time; in other
920                 words, the sender cannot mix update records and if there is more than
921                 one sender process they must synchronise to avoid issues.
922
923
924         For a RT update, we expect:
925                 newrt | start | <table-id>
926                 newrt | end | <count>
927                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
928                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
929                 mse| <mtype>[,sender] | <sub-id> | %meid
930
931
932         For a meid map update we expect:
933                 meid_map | start | <table-id>
934                 meid_map | end | <count> | <md5-hash>
935                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
936                 mme_del | <meid0> <meid1>...<meidn>
937
938
939         The pctx is our private context that must be used to send acks/status
940         messages back to the route manager.  The regular ctx is the ctx that
941         the user has been given and thus that's where we have to hang the route
942         table we're working with.
943
944         If mbuf is given, and we need to ack, then we ack using the mbuf and a
945         return to sender call (allows route manager to use wh_call() to send
946         an update and rts is required to get that back to the right thread).
947         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
948         will be used.
949 */
950 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
951         int i;
952         int ntoks;                                                      // number of tokens found in something
953         int ngtoks;
954         int     grp;                                                    // group number
955         rtable_ent_t const*     rte;                    // route table entry added
956         char*   tokens[128];
957         char*   tok=NULL;                                               // pointer into a token or string
958         char    wbuf[1024];
959
960         if( ! buf ) {
961                 return;
962         }
963
964         if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
965                 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
966                 write( ctx->snarf_rt_fd, "\n", 1 );
967         }
968
969         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
970                 buf++;
971         }
972         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
973         *(tok+1) = 0;
974
975         memset( tokens, 0, sizeof( tokens ) );
976         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
977                 tokens[0] = clip( tokens[0] );
978                 switch( *(tokens[0]) ) {
979                         case 0:                                                                                                 // ignore blanks
980                                 // fallthrough
981                         case '#':                                                                                               // and comment lines
982                                 break;
983
984                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
985                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
986                                         break;
987                                 }
988
989                                 if( ntoks < 3 ) {
990                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
991                                         break;
992                                 }
993
994                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
995                                 ctx->new_rtable->updates++;
996                                 break;
997
998                         case 'n':                                                                                               // newrt|{start|end}
999                                 tokens[1] = clip( tokens[1] );
1000                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1001                                         if( ctx && ctx->snarf_rt_fd >= 0 ) {
1002                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1003                                         }
1004
1005                                         if( ntoks >2 ) {
1006                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1007                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1008                                                                 ctx->new_rtable->updates, tokens[2] );
1009                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1010                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1011                                                         uta_rt_drop( ctx->new_rtable );
1012                                                         ctx->new_rtable = NULL;
1013                                                         break;
1014                                                 }
1015                                         }
1016
1017                                         if( ctx->new_rtable ) {
1018                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1019                                                 if( DEBUG > 1 || (vlevel > 1) ) {
1020                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1021                                                         dump_tables( ctx );
1022                                                 }
1023
1024                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1025                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1026                                                 ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
1027                                         } else {
1028                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1029                                                 ctx->new_rtable = NULL;
1030                                         }
1031                                 } else {                                                                                                                        // start a new table.
1032                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
1033                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1034
1035                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1036                                                 uta_rt_drop( ctx->new_rtable );
1037                                                 ctx->new_rtable = NULL;
1038                                         }
1039
1040                                         if( ctx->table_id != NULL ) {
1041                                                 free( ctx->table_id );
1042                                         }
1043                                         if( ntoks >2 ) {
1044                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1045                                         } else {
1046                                                 ctx->table_id = NULL;
1047                                         }
1048
1049                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
1050                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
1051
1052                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1053                                 }
1054                                 break;
1055
1056                         case 'm':                                                                       // mse entry or one of the meid_ records
1057                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
1058                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1059                                                 break;
1060                                         }
1061
1062                                         if( ntoks < 4 ) {
1063                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1064                                                 break;
1065                                         }
1066
1067                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1068                                         ctx->new_rtable->updates++;
1069                                 } else {
1070                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1071                                 }
1072                                 break;
1073
1074                         case 'r':                                       // assume rt entry
1075                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1076                                         break;
1077                                 }
1078
1079                                 ctx->new_rtable->updates++;
1080                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1081                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1082                                 } else {
1083                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1084                                 }
1085                                 break;
1086
1087                         case 'u':                                                                                               // update current table, not a total replacement
1088                                 if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
1089                                         rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1090                                         break;
1091                                 }
1092
1093                                 tokens[1] = clip( tokens[1] );
1094                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1095                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1096                                                 break;
1097                                         }
1098                                         if( ctx->snarf_rt_fd >= 0 ) {
1099                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1100                                         }
1101
1102                                         if( ntoks >2 ) {
1103                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1104                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1105                                                                 ctx->new_rtable->updates, tokens[2] );
1106                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1107                                                         uta_rt_drop( ctx->new_rtable );
1108                                                         ctx->new_rtable = NULL;
1109                                                         break;
1110                                                 }
1111                                         }
1112
1113                                         if( ctx->new_rtable ) {
1114                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1115                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1116                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1117                                                         dump_tables( ctx );
1118                                                 }
1119
1120                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1121                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1122                                         } else {
1123                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1124                                                 ctx->new_rtable = NULL;
1125                                         }
1126                                 } else {                                                                                        // start a new table.
1127                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1128                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1129                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1130                                                 uta_rt_drop( ctx->new_rtable );
1131                                                 ctx->new_rtable = NULL;
1132                                         }
1133
1134                                         if( ntoks > 2 ) {
1135                                                 if( ctx->table_id != NULL ) {
1136                                                         free( ctx->table_id );
1137                                                 }
1138                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1139                                         }
1140
1141                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1142                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1143
1144                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1145                                 }
1146                                 break;
1147
1148                         default:
1149                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1150                                 break;
1151                 }
1152         }
1153 }
1154
1155 /*
1156         This function attempts to open a static route table in order to create a 'seed'
1157         table during initialisation.  The environment variable RMR_SEED_RT is expected
1158         to contain the necessary path to the file. If missing, or if the file is empty,
1159         no route table will be available until one is received from the generator.
1160
1161         This function is probably most useful for testing situations, or extreme
1162         cases where the routes are static.
1163 */
1164 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1165         int             i;
1166         char*   fname;
1167         char*   fbuf;                           // buffer with file contents
1168         char*   rec;                            // start of the record
1169         char*   eor;                            // end of the record
1170         int             rcount = 0;                     // record count for debug
1171
1172         if( (fname = ctx->seed_rt_fname) == NULL ) {
1173                 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1174                         return;
1175                 }
1176
1177                 ctx->seed_rt_fname = strdup( fname );
1178                 fname = ctx->seed_rt_fname;
1179         }
1180
1181         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1182                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1183                 return;
1184         }
1185
1186         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1187         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1188                 if( *eor == '\r' ) {
1189                         *eor = '\n';                                                            // will look like a blank line which is ok
1190                 }
1191         }
1192
1193         rec = fbuf;
1194         while( rec && *rec ) {
1195                 rcount++;
1196                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1197                         *eor = 0;
1198                 } else {
1199                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1200                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1201                         free( fbuf );
1202                         return;
1203                 }
1204
1205                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1206
1207                 rec = eor+1;
1208         }
1209
1210         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1211         free( fbuf );
1212 }
1213
1214 /*
1215         Callback driven for each thing in a symtab. We collect the pointers to those
1216         things for later use (cloning).
1217 */
1218 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1219         thing_list_t*   tl;
1220
1221         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1222                 return;
1223         }
1224
1225         if( thing == NULL ) {
1226                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1227                 return;
1228         }
1229
1230         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1231         tl->things[tl->nused++] = thing;                // save a reference to the thing
1232
1233         if( tl->nused >= tl->nalloc ) {
1234                 extend_things( tl );                            // not enough allocated
1235         }
1236 }
1237
1238 /*
1239         Called to delete a route table entry struct. We delete the array of endpoint
1240         pointers, but NOT the endpoints referenced as those are referenced from
1241         multiple entries.
1242
1243         Route table entries can be concurrently referenced by multiple symtabs, so
1244         the actual delete happens only if decrementing the rte's ref count takes it
1245         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1246         the symtab, or overlaying an entry.
1247
1248         This function uses ONLY the pointer to the rte (thing) and ignores the other
1249         information that symtab foreach function passes (st, entry, and data) which
1250         means that it _can_ safetly be used outside of the foreach setting. If
1251         the function is changed to depend on any of these three, then a stand-alone
1252         rte_cleanup() function should be added and referenced by this, and refererences
1253         to this outside of the foreach world should be changed.
1254 */
1255 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1256         rtable_ent_t*   rte;
1257         int i;
1258
1259         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1260                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1261                 return;
1262         }
1263
1264         rte->refs--;
1265         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1266                 return;
1267         }
1268
1269         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1270                 for( i = 0; i < rte->nrrgroups; i++ ) {
1271                         if( rte->rrgroups[i] ) {
1272                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1273                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1274                         }
1275
1276                 }
1277
1278                 free( rte->rrgroups );
1279         }
1280
1281         free( rte );                                                                                    // finally, drop the potato
1282 }
1283
1284 /*
1285         Read an entire file into a buffer. We assume for route table files
1286         they will be smallish and so this won't be a problem.
1287         Returns a pointer to the buffer, or nil. Caller must free.
1288         Terminates the buffer with a nil character for string processing.
1289
1290         If we cannot stat the file, we assume it's empty or missing and return
1291         an empty buffer, as opposed to a nil, so the caller can generate defaults
1292         or error if an empty/missing file isn't tolerated.
1293 */
1294 static char* uta_fib( char const* fname ) {
1295         struct stat     stats;
1296         off_t           fsize = 8192;   // size of the file
1297         off_t           nread;                  // number of bytes read
1298         int                     fd;
1299         char*           buf;                    // input buffer
1300
1301         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1302                 if( fstat( fd, &stats ) >= 0 ) {
1303                         if( stats.st_size <= 0 ) {                                      // empty file
1304                                 close( fd );
1305                                 fd = -1;
1306                         } else {
1307                                 fsize = stats.st_size;                                          // stat ok, save the file size
1308                         }
1309                 } else {
1310                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1311                 }
1312         }
1313
1314         if( fd < 0 ) {                                                                                  // didn't open or empty
1315                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1316                         return NULL;
1317                 }
1318
1319                 *buf = 0;
1320                 return buf;
1321         }
1322
1323         // add a size limit check here
1324
1325         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1326                 close( fd );
1327                 errno = ENOMEM;
1328                 return NULL;
1329         }
1330
1331         nread = read( fd, buf, fsize );
1332         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1333                 free( buf );
1334                 errno = EFBIG;                                                                                  // likely too much to handle
1335                 close( fd );
1336                 return NULL;
1337         }
1338
1339         buf[nread] = 0;
1340
1341         close( fd );
1342         return buf;
1343 }
1344
1345 // --------------------- initialisation/creation ---------------------------------------------
1346 /*
1347         Create and initialise a route table; Returns a pointer to the table struct.
1348 */
1349 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1350         route_table_t*  rt;
1351
1352         if( ctx == NULL ) {
1353                 return NULL;
1354         }
1355         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1356                 return NULL;
1357         }
1358
1359         memset( rt, 0, sizeof( *rt ) );
1360
1361         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1362                 free( rt );
1363                 return NULL;
1364         }
1365
1366         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1367         return rt;
1368 }
1369
1370 /*
1371         Clones one of the spaces in the given table.
1372         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1373         Space is the space in the old table to copy. Space 0 uses an integer key and
1374         references rte structs. All other spaces use a string key and reference endpoints.
1375 */
1376 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1377         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1378         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1379         void*   sst;                    // source symtab
1380         void*   nst;                    // new symtab
1381         thing_list_t things;    // things from the space to copy
1382         int             i;
1383         int             free_on_err = 0;
1384
1385         if( ctx == NULL ) {
1386                 return NULL;
1387         }
1388         if( nrt == NULL ) {                             // make a new table if needed
1389                 free_on_err = 1;
1390                 nrt = uta_rt_init( ctx );
1391                 if( nrt == NULL ) {
1392                         return NULL;
1393                 }
1394         }
1395
1396         if( srt == NULL ) {             // source was nil, just give back the new table
1397                 return nrt;
1398         }
1399
1400         things.nalloc = 2048;
1401         things.nused = 0;
1402         things.error = 0;
1403         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1404         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1405         if( things.things == NULL || things.names == NULL ){
1406                 if( things.things != NULL ) {
1407                         free( things.things );
1408                 }
1409                 if( things.names != NULL ) {
1410                         free( things.names );
1411                 }
1412
1413                 if( free_on_err ) {
1414                         rmr_sym_free( nrt->hash );
1415                         free( nrt );
1416                         nrt = NULL;
1417                 } else {
1418                         nrt->error = 1;
1419                 }
1420
1421                 return nrt;
1422         }
1423         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1424         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1425
1426         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1427         nst = nrt->hash;
1428
1429         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1430         if( things.error ) {                            // something happened and capture failed
1431                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1432                 free( things.things );
1433                 free( things.names );
1434                 if( free_on_err ) {
1435                         rmr_sym_free( nrt->hash );
1436                         free( nrt );
1437                         nrt = NULL;
1438                 } else {
1439                         nrt->error = 1;
1440                 }
1441                 return nrt;
1442         }
1443
1444         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1445         for( i = 0; i < things.nused; i++ ) {
1446                 if( space ) {                                                                                           // string key, epoint reference
1447                         ep = (endpoint_t *) things.things[i];
1448                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1449                 } else {
1450                         rte = (rtable_ent_t *) things.things[i];
1451                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1452                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1453                 }
1454         }
1455
1456         free( things.things );
1457         free( (void *) things.names );
1458         return nrt;
1459 }
1460
1461 /*
1462         Given a destination route table (drt), clone from the source (srt) into it.
1463         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1464         allocate the drt if that was nil too). If all is true (1), then we will clone both
1465         the MT and the ME spaces; otherwise only the ME space is cloned.
1466 */
1467 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1468         endpoint_t*             ep;                             // an endpoint
1469         rtable_ent_t*   rte;                    // a route table entry
1470         int i;
1471
1472         if( ctx == NULL ) {
1473                 return NULL;
1474         }
1475         if( drt == NULL ) {
1476                 drt = uta_rt_init( ctx );
1477         }
1478         if( srt == NULL ) {
1479                 return drt;
1480         }
1481
1482         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1483         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1484         if( all ) {
1485                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1486         }
1487
1488         return drt;
1489 }
1490
1491 /*
1492         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1493         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1494         context to be referenced by the new pointer; the old pointer is set to nil.
1495
1496         If the old table doesn't exist, then a new table is created and the new pointer is
1497         set to reference it.
1498
1499         The ME namespace references endpoints which do not need to be released, therefore we
1500         do not need to run that portion of the table to deref like we do for the RTEs.
1501 */
1502 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1503         //int counter = 0;
1504         route_table_t*  rt;
1505
1506         if( ctx == NULL ) {
1507                 return NULL;
1508         }
1509
1510         pthread_mutex_lock( ctx->rtgate );
1511         if( (rt = ctx->old_rtable) != NULL ) {
1512                 ctx->old_rtable = NULL;
1513
1514                 while(  rt->ref_count > 0 ) {                           // wait for all who are using to stop
1515                         //if( counter++ > 1000 ) {
1516                         //      rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1517                         //      break;
1518                         //}
1519
1520                         pthread_mutex_unlock( ctx->rtgate );
1521                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1522                         pthread_mutex_lock( ctx->rtgate );
1523
1524                 }
1525
1526                 if( rt->hash != NULL ) {
1527                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1528                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1529                 }
1530
1531                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1532         } else {
1533                 rt = NULL;
1534         }
1535         pthread_mutex_unlock( ctx->rtgate );
1536
1537         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1538         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1539                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1540         } else {
1541                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1542                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1543                 rt->error = 1;
1544         }
1545
1546         return rt;
1547 }
1548
1549
1550 /*
1551         Given a name, find the endpoint struct in the provided route table.
1552 */
1553 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1554
1555         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1556                 return NULL;
1557         }
1558
1559         return rmr_sym_get( rt->ephash, ep_name, 1 );
1560 }
1561
1562 /*
1563         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1564         Does NOT destroy the gate as it's a common gate for ALL route tables.
1565 */
1566 static void uta_rt_drop( route_table_t* rt ) {
1567         if( rt == NULL ) {
1568                 return;
1569         }
1570
1571         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1572         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1573         free( rt );
1574 }
1575
1576 /*
1577         Look up and return the pointer to the endpoint stuct matching the given name.
1578         If not in the hash, a new endpoint is created, added to the hash. Should always
1579         return a pointer.
1580 */
1581 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1582         endpoint_t*     ep;
1583
1584         if( !rt || !ep_name || ! *ep_name ) {
1585                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1586                 errno = EINVAL;
1587                 return NULL;
1588         }
1589
1590         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1591                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1592                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1593                         errno = ENOMEM;
1594                         return NULL;
1595                 }
1596
1597                 ep->notify = 1;                                                         // show notification on first connection failure
1598                 ep->open = 0;                                                           // not connected
1599                 ep->addr = uta_h2ip( ep_name );
1600                 ep->name = strdup( ep_name );
1601                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1602                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1603
1604                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1605         }
1606
1607         return ep;
1608 }
1609
1610
1611 /*
1612         Given a session id and message type build a key that can be used to look up the rte in the route
1613         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1614 */
1615 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1616         uint64_t key;
1617
1618         if( sub_id == UNSET_SUBID ) {
1619                 key = 0xffffffff00000000 | mtype;
1620         } else {
1621                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1622         }
1623
1624         return key;
1625 }
1626
1627 /*
1628         Given a route table and meid string, find the owner (if known). Returns a pointer to
1629         the endpoint struct or nil.
1630 */
1631 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1632         endpoint_t const* ep;           // the ep we found in the hash
1633
1634         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1635                 return NULL;
1636         }
1637
1638         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1639 }
1640
1641 /*
1642         This returns a pointer to the currently active route table and ups
1643         the reference count so that the route table is not freed while it
1644         is being used. The caller MUST call release_rt() when finished
1645         with the pointer.
1646
1647         Care must be taken: the ctx->rtable pointer _could_ change during the time
1648         between the release of the lock and the return. Therefore we MUST grab
1649         the current pointer when we have the lock so that if it does we don't
1650         return a pointer to the wrong table.
1651
1652         This will return NULL if there is no active table.
1653 */
1654 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1655         route_table_t*  rrt;                    // return value
1656
1657         if( ctx == NULL || ctx->rtable == NULL ) {
1658                 return NULL;
1659         }
1660
1661         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1662         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1663         rrt->ref_count++;
1664         pthread_mutex_unlock( ctx->rtgate );
1665
1666         return rrt;                                                                             // pointer we upped the count with
1667 }
1668
1669 /*
1670         This will "release" the route table by reducing the use counter
1671         in the table. The table may not be freed until the counter reaches
1672         0, so it's imparative that the pointer be "released" when it is
1673         fetched by get_rt().  Once the caller has released the table it
1674         may not safely use the pointer that it had.
1675 */
1676 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1677         if( ctx == NULL || rt == NULL ) {
1678                 return;
1679         }
1680
1681         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1682         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1683                 rt->ref_count--;
1684         }
1685         pthread_mutex_unlock( ctx->rtgate );
1686 }
1687
1688 #endif