Beef up unit tests for SI95 code
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1  // :vi sw=4 ts=4 noet2
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int error;                                      // if a realloc failed, this will be set
61         int nalloc;
62         int nused;
63         void** things;
64         const char** names;
65 } thing_list_t;
66
67 // ---- debugging/testing -------------------------------------------------------------------------
68
69 /*
70         Dump some stats for an endpoint in the RT. This is generally called to
71         verify endpoints after a table load/change.
72
73         This is called by the for-each mechanism of the symtab and the prototype is
74         fixe; we don't really use some of the parms, but have dummy references to
75         keep sonar from complaining.
76 */
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
78         int*    counter;
79         endpoint_t* ep;
80
81         if( (ep = (endpoint_t *) thing) == NULL ) {
82                 return;
83         }
84
85         if( (counter = (int *) vcounter) != NULL ) {
86                 (*counter)++;
87         } else {
88                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
89                 return;
90         }
91
92         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
93 }
94
95 /*
96         Called to count meid entries in the table. The meid points to an 'owning' endpoint
97         so we can list what we find
98
99         See note in ep_stats about dummy refs.
100 */
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
102         int*    counter;
103         endpoint_t* ep;
104
105         if( (ep = (endpoint_t *) thing) == NULL ) {
106                 return;
107         }
108
109         if( (counter = (int *) vcounter) != NULL ) {
110                 (*counter)++;
111         } else {
112                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
113         }
114
115         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
116 }
117
118 /*
119         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120         the 'source' information and is added to each message.
121
122         See note above about dummy references.
123 */
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
125         endpoint_t* ep;
126         char*   id;
127
128         if( (ep = (endpoint_t *) thing) == NULL ) {
129                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
130                 return;
131         }
132
133         if( (id = (char *) vid) == NULL ) {
134                 id = "missing";
135         }
136
137         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138                 (long long) time( NULL ),
139                 id,
140                 ep->name,
141                 ep->open,
142                 ep->scounts[EPSC_GOOD],
143                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144                 ep->scounts[EPSC_FAIL],
145                 ep->scounts[EPSC_TRANS]   );
146 }
147
148 /*
149         Dump stats for a route entry in the table.
150 */
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
152         int*    counter;
153         rtable_ent_t const* rte;                // thing is really an rte
154         int             mtype;
155         int             sid;
156
157         if( (rte = (rtable_ent_t *) thing) == NULL ) {
158                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
159                 return;
160         }
161
162         if( (counter = (int *) vcounter) != NULL ) {
163                 (*counter)++;
164         }
165
166         mtype = rte->key & 0xffff;
167         sid = (int) (rte->key >> 32);
168
169         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
170 }
171
172 /*
173         Given a route table, cause some stats to be spit out.
174 */
175 static void  rt_stats( route_table_t* rt ) {
176         int* counter;
177
178         if( rt == NULL ) {
179                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
180                 return;
181         }
182
183         counter = (int *) malloc( sizeof( int ) );
184         *counter = 0;
185         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
188         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
189
190         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
191         *counter = 0;
192         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
193         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
194
195         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
196         *counter = 0;
197         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
198         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
199
200         free( counter );
201 }
202
203 /*
204         Given a route table, cause endpoint counters to be written to stderr. The id
205         parm is written as the "source" in the output.
206 */
207 static void  rt_epcounts( route_table_t* rt, char* id ) {
208         if( rt == NULL ) {
209                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
210                 return;
211         }
212
213         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
214 }
215
216
217 static void dump_tables( uta_ctx_t *ctx ) {
218         if( ctx->old_rtable != NULL ) {
219                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220                 rt_stats( ctx->old_rtable );
221         } else {
222                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
223         }
224         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225         rt_stats( ctx->rtable );
226 }
227
228 // ------------ route manager communication -------------------------------------------------
229 /*
230         Send a request for a table update to the route manager. Updates come in
231         async, so send and go.
232
233         pctx is the private context for the thread; ctx is the application context
234         that we need to be able to send the application ID in case rt mgr needs to
235         use it to idenfity us.
236
237         Returns 0 if we were not able to send a request.
238 */
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
240         rmr_mbuf_t*     smsg;
241         int     state = 0;
242
243         if( ctx->rtg_whid < 0 ) {
244                 return state;
245         }
246
247         smsg = rmr_alloc_msg( pctx, 1024 );
248         if( smsg != NULL ) {
249                 smsg->mtype = RMRRM_REQ_TABLE;
250                 smsg->sub_id = 0;
251                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253                 smsg->len = strlen( smsg->payload ) + 1;
254
255                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256                 if( (state = smsg->state) != RMR_OK ) {
257                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
259                         ctx->rtg_whid = -1;
260                 }
261
262                 rmr_free_msg( smsg );
263         }
264
265         return state;
266 }
267
268 /*
269         Send an ack to the route table manager for a table ID that we are
270         processing.      State is 1 for OK, and 0 for failed. Reason might
271         be populated if we know why there was a failure.
272
273         Context should be the PRIVATE context that we use for messages
274         to route manger and NOT the user's context.
275
276         If a message buffere is passed we use that and use return to sender
277         assuming that this might be a response to a call and that is needed
278         to send back to the proper calling thread. If msg is nil, we allocate
279         and use it.
280 */
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
282         int             use_rts = 1;
283         int             payload_size = 1024;
284
285         if( ctx == NULL || ctx->rtg_whid < 0 ) {
286                 return;
287         }
288
289         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
290                 return;
291         }
292
293         if( smsg != NULL ) {
294                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
295         } else {
296                 use_rts = 0;
297                 smsg = rmr_alloc_msg( ctx, payload_size );
298         }
299
300         if( smsg != NULL ) {
301                 smsg->mtype = RMRRM_TABLE_STATE;
302                 smsg->sub_id = -1;
303                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
305
306                 smsg->len = strlen( smsg->payload ) + 1;
307
308                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
309                 if( use_rts ) {
310                         smsg = rmr_rts_msg( ctx, smsg );
311                 } else {
312                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
313                 }
314                 if( (state = smsg->state) != RMR_OK ) {
315                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
317                         ctx->rtg_whid = -1;
318                 }
319
320                 if( ! use_rts ) {
321                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
322                 }
323         }
324 }
325
326 // ---- alarm generation --------------------------------------------------------------------------
327
328 /*
329         Given the user's context (not the thread private context) look to see if the application isn't
330         working fast enough and we're dropping messages. If the drop counter has changed since the last
331         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
332         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
333
334         The private context is what we use to send so as not to interfere with the user flow.
335 */
336 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
337         static  int alarm_raised = 0;
338         static  int ok2clear = 0;                                       // time that we can clear
339         static  int lastd = 0;                                          // the last counter value so we can compute delta
340         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
341
342         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
343         if( ! alarm_raised ) {
344                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
345                         return;
346                 }
347
348                 alarm_raised = 1;
349                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
350                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
351         } else {
352                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
353                         lastd = uctx->dcount;
354                         ok2clear = 0;                                                   // reset the timer
355                         return;
356                 }
357
358                 if( ok2clear == 0 ) {                                           // first round where not dropping
359                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
360                 } else {
361                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
362                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
363                                 alarm_raised = 0;
364                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
365                                 prob_id++;
366                         }
367                 }
368         }
369 }
370
371 // ---- utility -----------------------------------------------------------------------------------
372 /*
373         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
374         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
375 */
376 static char* clip( char* buf ) {
377         char*   tok;
378
379         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
380                 buf++;
381         }
382
383         if( (tok = strchr( buf, '#' )) != NULL ) {
384                 if( tok == buf ) {
385                         return buf;                                     // just push back; leading comment sym handled there
386                 }
387
388                 if( isspace( *(tok-1) ) ) {
389                         *tok = 0;
390                 }
391         }
392
393         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
394         *(tok+1) = 0;
395
396         return buf;
397 }
398
399 /*
400         This accepts a pointer to a nil terminated string, and ensures that there is a
401         newline as the last character. If there is not, a new buffer is allocated and
402         the newline is added.  If a new buffer is allocated, the buffer passed in is
403         freed.  The function returns a pointer which the caller should use, and must
404         free.  In the event of an error, a nil pointer is returned.
405 */
406 static char* ensure_nlterm( char* buf ) {
407         char*   nb = NULL;
408         int             len = 0;
409
410         if( buf != NULL ) {
411                 len = strlen( buf );
412         }
413
414         nb = buf;                                                       // default to returning original as is
415         switch( len ) {
416                 case 0:
417                         nb = strdup( "\n" );
418                         break;
419
420                 case 1:
421                         if( *buf != '\n' ) {            // not a newline; realloc
422                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
423                                 nb = strdup( " \n" );
424                                 *nb = *buf;
425                                 free( buf );
426                         }
427                         break;
428
429                 default:
430                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
431                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
432                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
433                                         memcpy( nb, buf, len );
434                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
435                                         *(nb+len+1) = 0;
436                                         free( buf );
437                                 }
438                         }
439                         break;
440         }
441
442         return nb;
443 }
444
445 /*
446         Roll the new table into the active and the active into the old table. We
447         must have the lock on the active table to do this. It's possible that there
448         is no active table (first load), so we have to account for that (no locking).
449 */
450 static void roll_tables( uta_ctx_t* ctx ) {
451
452         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
453                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
454                 ctx->old_rtable = ctx->new_rtable;
455                 ctx->new_rtable = NULL;
456                 return;
457         }
458
459         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
460                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
461                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
462                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
463                 pthread_mutex_unlock( ctx->rtgate );
464         } else {
465                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
466                 ctx->rtable = ctx->new_rtable;                          // make new the active one
467         }
468
469         ctx->new_rtable = NULL;
470 }
471
472 /*
473         Given a thing list, extend the array of pointers by 1/2 of the current
474         number allocated. If we cannot realloc an array, then we set the error
475         flag.  Unlikely, but will prevent a crash, AND will prevent us from
476         trying to use the table since we couldn't capture everything.
477 */
478 static void extend_things( thing_list_t* tl ) {
479         int old_alloc;
480         void*   old_things;
481         void*   old_names;
482
483         if( tl == NULL ) {
484                 return;
485         }
486
487         old_alloc = tl->nalloc;                         // capture current things
488         old_things = tl->things;
489         old_names = tl->names;
490
491         tl->nalloc += tl->nalloc/2;                     // new allocation size
492
493         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
494         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
495
496         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
497                 tl->error = 1;
498                 return;
499         }
500
501         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
502         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
503
504         free( old_things );
505         free( old_names );
506 }
507
508 // ------------ entry update functions ---------------------------------------------------------------
509 /*
510         Given a message type create a route table entry and add to the hash keyed on the
511         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
512         is the number of group slots to allocate in the entry.
513 */
514 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
515         rtable_ent_t* rte;
516         rtable_ent_t* old_rte;          // entry which was already in the table for the key
517
518         if( rt == NULL ) {
519                 return NULL;
520         }
521
522         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
523                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
524                 return NULL;
525         }
526         memset( rte, 0, sizeof( *rte ) );
527         rte->refs = 1;
528         rte->key = key;
529
530         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
531                 nrrgroups = 10;
532         }
533
534         if( nrrgroups ) {
535                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
536                         free( rte );
537                         return NULL;
538                 }
539                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
540         } else {
541                 rte->rrgroups = NULL;
542         }
543
544         rte->nrrgroups = nrrgroups;
545
546         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
547                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
548         }
549
550         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
551
552         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
553         return rte;
554 }
555
556 /*
557         This accepts partially parsed information from an rte or mse record sent by route manager or read from
558         a file such that:
559                 ts_field is the msg-type,sender field
560                 subid is the integer subscription id
561                 rr_field is the endpoint information for round robening message over
562
563         If all goes well, this will add an RTE to the table under construction.
564
565         The ts_field is checked to see if we should ingest this record. We ingest if one of
566         these is true:
567                 there is no sender info (a generic entry for all)
568                 there is sender and our host:port matches one of the senders
569                 the sender info is an IP address that matches one of our IP addresses
570 */
571 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
572         rtable_ent_t*   rte;            // route table entry added
573         char const*     tok;
574         int             ntoks;
575         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
576         char*   tokens[128];
577         char*   gtokens[64];
578         int             ngtoks;                         // number of tokens in the group list
579         int             grp;                            // index into group list
580         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
581         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
582
583         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
584         rr_field = clip( rr_field );
585
586         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
587                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
588                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
589
590                 key = build_rt_key( subid, atoi( ts_field ) );
591
592                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
593
594                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
595                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
596                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
597                         }
598                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
599                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
600
601                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
602                                 int             i;                                      // avoid sonar grumbling by defining this here
603
604                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
605                                         for( i = 0; i < ntoks; i++ ) {
606                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
607                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
608                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
609                                                         has_ep = TRUE;
610                                                 }
611                                         }
612                                         if( has_ep ) {
613                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
614                                                 has_ep = FALSE;
615                                         }
616                                 }
617                         }
618                 }
619         } else {
620                 if( DEBUG || (vlevel > 2) ) {
621                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
622                 }
623         }
624 }
625
626 /*
627         Trash_entry takes a partially parsed record from the input and
628         will delete the entry if the sender,mtype matches us or it's a
629         generic mtype. The refernce in the new table is removed and the
630         refcounter for the actual rte is decreased. If that ref count is
631         0 then the memory is freed (handled byh the del_rte call).
632 */
633 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
634         rtable_ent_t*   rte;            // route table entry to be 'deleted'
635         char const*     tok;
636         int             ntoks;
637         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
638         char*   tokens[128];
639
640         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
641                 return;
642         }
643
644         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
645
646         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
647                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
648                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
649
650                 key = build_rt_key( subid, atoi( ts_field ) );
651                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
652                 if( rte != NULL ) {
653                         if( DEBUG || (vlevel > 1) ) {
654                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
655                         }
656                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
657                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
658                 } else {
659                         if( DEBUG || (vlevel > 1) ) {
660                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
661                         }
662                 }
663         } else {
664                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
665         }
666 }
667
668 // -------------------------- parse functions --------------------------------------------------
669
670 /*
671         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
672         the 'owner' which should be the dns name or IP address of an enpoint
673         the meid_list is a space separated list of me IDs
674
675         This function assumes the caller has vetted the pointers as needed.
676
677         For each meid in the list, an entry is pushed into the hash which references the owner
678         endpoint such that when the meid is used to route a message it references the endpoint
679         to send messages to.
680 */
681 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
682         char const*     tok;
683         int             ntoks;
684         char*   tokens[128];
685         int             i;
686         int             state;
687         endpoint_t*     ep;                                             // endpoint struct for the owner
688
689         owner = clip( owner );                          // ditch extra whitespace and trailing comments
690         meid_list = clip( meid_list );
691
692         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
693         for( i = 0; i < ntoks; i++ ) {
694                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
695                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
696                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
697                 } else {
698                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
699                 }
700         }
701 }
702
703 /*
704         Given the tokens from an mme_del, delete the listed meid entries from the new
705         table. The list is a space separated list of meids.
706
707         The meids in the hash reference endpoints which are never deleted and so
708         the only thing that we need to do here is to remove the meid from the hash.
709
710         This function assumes the caller has vetted the pointers as needed.
711 */
712 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
713         char const*     tok;
714         int             ntoks;
715         char*   tokens[128];
716         int             i;
717
718         if( rtab->hash == NULL ) {
719                 return;
720         }
721
722         meid_list = clip( meid_list );
723
724         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
725         for( i = 0; i < ntoks; i++ ) {
726                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
727                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
728         }
729 }
730
731 /*
732         Parse a partially parsed meid record. Tokens[0] should be one of:
733                 meid_map, mme_ar, mme_del.
734
735         pctx is the private context needed to return an ack/nack using the provided
736         message buffer with the route managers address info.
737 */
738 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
739         char wbuf[1024];
740
741         if( tokens == NULL || ntoks < 1 ) {
742                 return;                                                 // silent but should never happen
743         }
744
745         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
746                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
747                 return;
748         }
749
750         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
751                 tokens[1] = clip( tokens[1] );
752                 if( *(tokens[1]) == 's' ) {
753                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
754                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
755                                 uta_rt_drop( ctx->new_rtable );
756                                 ctx->new_rtable = NULL;
757                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
758                         }
759
760                         if( ctx->table_id != NULL ) {
761                                 free( ctx->table_id );
762                         }
763                         if( ntoks > 2 ) {
764                                 ctx->table_id = strdup( clip( tokens[2] ) );
765                         } else {
766                                 ctx->table_id = NULL;
767                         }
768
769                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
770                         ctx->new_rtable->mupdates = 0;
771
772                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
773                 } else {
774                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
775                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
776                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
777                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
778                                                                 ctx->new_rtable->mupdates, tokens[2] );
779                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
780                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
781                                                 uta_rt_drop( ctx->new_rtable );
782                                                 ctx->new_rtable = NULL;
783                                                 return;
784                                         }
785
786                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
787                                 }
788
789                                 if( ctx->new_rtable ) {
790                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
791                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
792                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
793
794                                         if( vlevel > 0 ) {
795                                                 if( ctx->old_rtable != NULL ) {
796                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
797                                                         rt_stats( ctx->old_rtable );
798                                                 } else {
799                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
800                                                 }
801                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
802                                                 rt_stats( ctx->rtable );
803                                         }
804                                 } else {
805                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
806                                         ctx->new_rtable = NULL;
807                                 }
808                         }
809                 }
810
811                 return;
812         }
813
814         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
815                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
816                 return;
817         }
818
819         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
820                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
821                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
822                         return;
823                 }
824                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
825                 ctx->new_rtable->mupdates++;
826                 return;
827         }
828
829         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
830                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
831                 ctx->new_rtable->mupdates++;
832                 return;
833         }
834 }
835
836 /*
837         This will close the current table snarf file (in *.inc) and open a new one.
838         The curent one is renamed. The final file name is determined by the setting of
839         RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
840         an additional extension of .snarf.  If neither seed or snarf environment vars are
841         set then this does nothing.
842
843         If this is called before the tmp snarf file is opened, then this just opens the file.
844 */
845 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
846         static int              ok2warn = 0;    // some warnings squelched on first call
847
848         char*   seed_fname;                             // the filename from env
849         char    tfname[512];                    // temp fname
850         char    wfname[512];                    // working buffer for filename
851         char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
852
853         if( ctx == NULL ) {
854                 return;
855         }
856
857         if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
858                 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
859                         memset( wfname, 0, sizeof( wfname ) );
860                         snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
861                         snarf_fname = wfname;
862                 }
863         }
864
865         if( snarf_fname == NULL ) {
866                 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
867                 return;
868         }
869
870         memset( tfname, 0, sizeof( tfname ) );
871         snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
872
873         if( ctx->snarf_rt_fd >= 0 ) {
874                 char* msg= "### captured from route manager\n";
875                 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
876                 if( close( ctx->snarf_rt_fd ) < 0 ) {
877                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
878                         return;
879                 }
880
881                 if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
882                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
883                 }
884
885                 if( rename( tfname, snarf_fname ) ) {
886                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
887                 } else {
888                         rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
889                 }
890         }
891         ok2warn = 1;
892
893         ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
894         if( ctx->snarf_rt_fd < 0 ) {
895                 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
896         } else {
897                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
898         }
899 }
900
901 /*
902         Parse a single record recevied from the route table generator, or read
903         from a static route table file.  Start records cause a new table to
904         be started (if a partial table was received it is discarded. Table
905         entry records are added to the currenly 'in progress' table, and an
906         end record causes the in progress table to be finalised and the
907         currently active table is replaced.
908
909         The updated table will be activated when the *|end record is encountered.
910         However, to allow for a "double" update, where both the meid map and the
911         route table must be updated at the same time, the end indication on a
912         route table (new or update) may specifiy "hold" which indicates that meid
913         map entries are to follow and the updated route table should be held as
914         pending until the end of the meid map is received and validated.
915
916         CAUTION:  we are assuming that there is a single route/meid map generator
917                 and as such only one type of update is received at a time; in other
918                 words, the sender cannot mix update records and if there is more than
919                 one sender process they must synchronise to avoid issues.
920
921
922         For a RT update, we expect:
923                 newrt | start | <table-id>
924                 newrt | end | <count>
925                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
926                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
927                 mse| <mtype>[,sender] | <sub-id> | %meid
928
929
930         For a meid map update we expect:
931                 meid_map | start | <table-id>
932                 meid_map | end | <count> | <md5-hash>
933                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
934                 mme_del | <meid0> <meid1>...<meidn>
935
936
937         The pctx is our private context that must be used to send acks/status
938         messages back to the route manager.  The regular ctx is the ctx that
939         the user has been given and thus that's where we have to hang the route
940         table we're working with.
941
942         If mbuf is given, and we need to ack, then we ack using the mbuf and a
943         return to sender call (allows route manager to use wh_call() to send
944         an update and rts is required to get that back to the right thread).
945         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
946         will be used.
947 */
948 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
949         int i;
950         int ntoks;                                                      // number of tokens found in something
951         int ngtoks;
952         int     grp;                                                    // group number
953         rtable_ent_t const*     rte;                    // route table entry added
954         char*   tokens[128];
955         char*   tok;                                            // pointer into a token or string
956         char    wbuf[1024];
957
958         if( ! buf ) {
959                 return;
960         }
961
962         if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
963                 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
964                 write( ctx->snarf_rt_fd, "\n", 1 );
965         }
966
967         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
968                 buf++;
969         }
970         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
971         *(tok+1) = 0;
972
973         memset( tokens, 0, sizeof( tokens ) );
974         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
975                 tokens[0] = clip( tokens[0] );
976                 switch( *(tokens[0]) ) {
977                         case 0:                                                                                                 // ignore blanks
978                                 // fallthrough
979                         case '#':                                                                                               // and comment lines
980                                 break;
981
982                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
983                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
984                                         break;
985                                 }
986
987                                 if( ntoks < 3 ) {
988                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
989                                         break;
990                                 }
991
992                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
993                                 ctx->new_rtable->updates++;
994                                 break;
995
996                         case 'n':                                                                                               // newrt|{start|end}
997                                 tokens[1] = clip( tokens[1] );
998                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
999                                         if( ctx && ctx->snarf_rt_fd >= 0 ) {
1000                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1001                                         }
1002
1003                                         if( ntoks >2 ) {
1004                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1005                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1006                                                                 ctx->new_rtable->updates, tokens[2] );
1007                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1008                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1009                                                         uta_rt_drop( ctx->new_rtable );
1010                                                         ctx->new_rtable = NULL;
1011                                                         break;
1012                                                 }
1013                                         }
1014
1015                                         if( ctx->new_rtable ) {
1016                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1017                                                 if( DEBUG > 1 || (vlevel > 1) ) {
1018                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1019                                                         dump_tables( ctx );
1020                                                 }
1021
1022                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1023                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1024                                                 ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
1025                                         } else {
1026                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1027                                                 ctx->new_rtable = NULL;
1028                                         }
1029                                 } else {                                                                                                                        // start a new table.
1030                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
1031                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1032
1033                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1034                                                 uta_rt_drop( ctx->new_rtable );
1035                                                 ctx->new_rtable = NULL;
1036                                         }
1037
1038                                         if( ctx->table_id != NULL ) {
1039                                                 free( ctx->table_id );
1040                                         }
1041                                         if( ntoks >2 ) {
1042                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1043                                         } else {
1044                                                 ctx->table_id = NULL;
1045                                         }
1046
1047                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
1048                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
1049
1050                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1051                                 }
1052                                 break;
1053
1054                         case 'm':                                                                       // mse entry or one of the meid_ records
1055                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
1056                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1057                                                 break;
1058                                         }
1059
1060                                         if( ntoks < 4 ) {
1061                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1062                                                 break;
1063                                         }
1064
1065                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1066                                         ctx->new_rtable->updates++;
1067                                 } else {
1068                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1069                                 }
1070                                 break;
1071
1072                         case 'r':                                       // assume rt entry
1073                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1074                                         break;
1075                                 }
1076
1077                                 ctx->new_rtable->updates++;
1078                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1079                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1080                                 } else {
1081                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1082                                 }
1083                                 break;
1084
1085                         case 'u':                                                                                               // update current table, not a total replacement
1086                                 if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
1087                                         rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1088                                         break;
1089                                 }
1090
1091                                 tokens[1] = clip( tokens[1] );
1092                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1093                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1094                                                 break;
1095                                         }
1096                                         if( ctx->snarf_rt_fd >= 0 ) {
1097                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1098                                         }
1099
1100                                         if( ntoks >2 ) {
1101                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1102                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1103                                                                 ctx->new_rtable->updates, tokens[2] );
1104                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1105                                                         uta_rt_drop( ctx->new_rtable );
1106                                                         ctx->new_rtable = NULL;
1107                                                         break;
1108                                                 }
1109                                         }
1110
1111                                         if( ctx->new_rtable ) {
1112                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1113                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1114                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1115                                                         dump_tables( ctx );
1116                                                 }
1117
1118                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1119                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1120                                         } else {
1121                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1122                                                 ctx->new_rtable = NULL;
1123                                         }
1124                                 } else {                                                                                        // start a new table.
1125                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1126                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1127                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1128                                                 uta_rt_drop( ctx->new_rtable );
1129                                                 ctx->new_rtable = NULL;
1130                                         }
1131
1132                                         if( ntoks > 2 ) {
1133                                                 if( ctx->table_id != NULL ) {
1134                                                         free( ctx->table_id );
1135                                                 }
1136                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1137                                         }
1138
1139                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1140                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1141
1142                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1143                                 }
1144                                 break;
1145
1146                         default:
1147                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1148                                 break;
1149                 }
1150         }
1151 }
1152
1153 /*
1154         This function attempts to open a static route table in order to create a 'seed'
1155         table during initialisation.  The environment variable RMR_SEED_RT is expected
1156         to contain the necessary path to the file. If missing, or if the file is empty,
1157         no route table will be available until one is received from the generator.
1158
1159         This function is probably most useful for testing situations, or extreme
1160         cases where the routes are static.
1161 */
1162 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1163         int             i;
1164         char*   fname;
1165         char*   fbuf;                           // buffer with file contents
1166         char*   rec;                            // start of the record
1167         char*   eor;                            // end of the record
1168         int             rcount = 0;                     // record count for debug
1169
1170         if( (fname = ctx->seed_rt_fname) == NULL ) {
1171                 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1172                         return;
1173                 }
1174
1175                 ctx->seed_rt_fname = strdup( fname );
1176                 fname = ctx->seed_rt_fname;
1177         }
1178
1179         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1180                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1181                 return;
1182         }
1183
1184         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1185         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1186                 if( *eor == '\r' ) {
1187                         *eor = '\n';                                                            // will look like a blank line which is ok
1188                 }
1189         }
1190
1191         rec = fbuf;
1192         while( rec && *rec ) {
1193                 rcount++;
1194                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1195                         *eor = 0;
1196                 } else {
1197                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1198                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1199                         free( fbuf );
1200                         return;
1201                 }
1202
1203                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1204
1205                 rec = eor+1;
1206         }
1207
1208         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1209         free( fbuf );
1210 }
1211
1212 /*
1213         Callback driven for each thing in a symtab. We collect the pointers to those
1214         things for later use (cloning).
1215 */
1216 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1217         thing_list_t*   tl;
1218
1219         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1220                 return;
1221         }
1222
1223         if( thing == NULL ) {
1224                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1225                 return;
1226         }
1227
1228         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1229         tl->things[tl->nused++] = thing;                // save a reference to the thing
1230
1231         if( tl->nused >= tl->nalloc ) {
1232                 extend_things( tl );                            // not enough allocated
1233         }
1234 }
1235
1236 /*
1237         Called to delete a route table entry struct. We delete the array of endpoint
1238         pointers, but NOT the endpoints referenced as those are referenced from
1239         multiple entries.
1240
1241         Route table entries can be concurrently referenced by multiple symtabs, so
1242         the actual delete happens only if decrementing the rte's ref count takes it
1243         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1244         the symtab, or overlaying an entry.
1245
1246         This function uses ONLY the pointer to the rte (thing) and ignores the other
1247         information that symtab foreach function passes (st, entry, and data) which
1248         means that it _can_ safetly be used outside of the foreach setting. If
1249         the function is changed to depend on any of these three, then a stand-alone
1250         rte_cleanup() function should be added and referenced by this, and refererences
1251         to this outside of the foreach world should be changed.
1252 */
1253 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1254         rtable_ent_t*   rte;
1255         int i;
1256
1257         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1258                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1259                 return;
1260         }
1261
1262         rte->refs--;
1263         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1264                 return;
1265         }
1266
1267         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1268                 for( i = 0; i < rte->nrrgroups; i++ ) {
1269                         if( rte->rrgroups[i] ) {
1270                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1271                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1272                         }
1273
1274                 }
1275
1276                 free( rte->rrgroups );
1277         }
1278
1279         free( rte );                                                                                    // finally, drop the potato
1280 }
1281
1282 /*
1283         Read an entire file into a buffer. We assume for route table files
1284         they will be smallish and so this won't be a problem.
1285         Returns a pointer to the buffer, or nil. Caller must free.
1286         Terminates the buffer with a nil character for string processing.
1287
1288         If we cannot stat the file, we assume it's empty or missing and return
1289         an empty buffer, as opposed to a nil, so the caller can generate defaults
1290         or error if an empty/missing file isn't tolerated.
1291 */
1292 static char* uta_fib( char const* fname ) {
1293         struct stat     stats;
1294         off_t           fsize = 8192;   // size of the file
1295         off_t           nread;                  // number of bytes read
1296         int                     fd;
1297         char*           buf;                    // input buffer
1298
1299         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1300                 if( fstat( fd, &stats ) >= 0 ) {
1301                         if( stats.st_size <= 0 ) {                                      // empty file
1302                                 close( fd );
1303                                 fd = -1;
1304                         } else {
1305                                 fsize = stats.st_size;                                          // stat ok, save the file size
1306                         }
1307                 } else {
1308                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1309                 }
1310         }
1311
1312         if( fd < 0 ) {                                                                                  // didn't open or empty
1313                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1314                         return NULL;
1315                 }
1316
1317                 *buf = 0;
1318                 return buf;
1319         }
1320
1321         // add a size limit check here
1322
1323         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1324                 close( fd );
1325                 errno = ENOMEM;
1326                 return NULL;
1327         }
1328
1329         nread = read( fd, buf, fsize );
1330         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1331                 free( buf );
1332                 errno = EFBIG;                                                                                  // likely too much to handle
1333                 close( fd );
1334                 return NULL;
1335         }
1336
1337         buf[nread] = 0;
1338
1339         close( fd );
1340         return buf;
1341 }
1342
1343 // --------------------- initialisation/creation ---------------------------------------------
1344 /*
1345         Create and initialise a route table; Returns a pointer to the table struct.
1346 */
1347 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1348         route_table_t*  rt;
1349
1350         if( ctx == NULL ) {
1351                 return NULL;
1352         }
1353         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1354                 return NULL;
1355         }
1356
1357         memset( rt, 0, sizeof( *rt ) );
1358
1359         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1360                 free( rt );
1361                 return NULL;
1362         }
1363
1364         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1365         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1366         pthread_mutex_init( rt->gate, NULL );
1367
1368         return rt;
1369 }
1370
1371 /*
1372         Clones one of the spaces in the given table.
1373         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1374         Space is the space in the old table to copy. Space 0 uses an integer key and
1375         references rte structs. All other spaces use a string key and reference endpoints.
1376 */
1377 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1378         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1379         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1380         void*   sst;                    // source symtab
1381         void*   nst;                    // new symtab
1382         thing_list_t things;    // things from the space to copy
1383         int             i;
1384         int             free_on_err = 0;
1385
1386         if( ctx == NULL ) {
1387                 return NULL;
1388         }
1389         if( nrt == NULL ) {                             // make a new table if needed
1390                 free_on_err = 1;
1391                 nrt = uta_rt_init( ctx );
1392                 if( nrt == NULL ) {
1393                         return NULL;
1394                 }
1395         }
1396
1397         if( srt == NULL ) {             // source was nil, just give back the new table
1398                 return nrt;
1399         }
1400
1401         things.nalloc = 2048;
1402         things.nused = 0;
1403         things.error = 0;
1404         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1405         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1406         if( things.things == NULL || things.names == NULL ){
1407                 if( things.things != NULL ) {
1408                         free( things.things );
1409                 }
1410                 if( things.names != NULL ) {
1411                         free( things.names );
1412                 }
1413
1414                 if( free_on_err ) {
1415                         rmr_sym_free( nrt->hash );
1416                         free( nrt );
1417                         nrt = NULL;
1418                 } else {
1419                         nrt->error = 1;
1420                 }
1421
1422                 return nrt;
1423         }
1424         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1425         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1426
1427         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1428         nst = nrt->hash;
1429
1430         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1431         if( things.error ) {                            // something happened and capture failed
1432                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1433                 free( things.things );
1434                 free( things.names );
1435                 if( free_on_err ) {
1436                         rmr_sym_free( nrt->hash );
1437                         free( nrt );
1438                         nrt = NULL;
1439                 } else {
1440                         nrt->error = 1;
1441                 }
1442                 return nrt;
1443         }
1444
1445         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1446         for( i = 0; i < things.nused; i++ ) {
1447                 if( space ) {                                                                                           // string key, epoint reference
1448                         ep = (endpoint_t *) things.things[i];
1449                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1450                 } else {
1451                         rte = (rtable_ent_t *) things.things[i];
1452                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1453                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1454                 }
1455         }
1456
1457         free( things.things );
1458         free( (void *) things.names );
1459         return nrt;
1460 }
1461
1462 /*
1463         Given a destination route table (drt), clone from the source (srt) into it.
1464         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1465         allocate the drt if that was nil too). If all is true (1), then we will clone both
1466         the MT and the ME spaces; otherwise only the ME space is cloned.
1467 */
1468 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1469         endpoint_t*             ep;                             // an endpoint
1470         rtable_ent_t*   rte;                    // a route table entry
1471         int i;
1472
1473         if( ctx == NULL ) {
1474                 return NULL;
1475         }
1476         if( drt == NULL ) {
1477                 drt = uta_rt_init( ctx );
1478         }
1479         if( srt == NULL ) {
1480                 return drt;
1481         }
1482
1483         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1484         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1485         if( all ) {
1486                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1487         }
1488
1489         return drt;
1490 }
1491
1492 /*
1493         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1494         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1495         context to be referenced by the new pointer; the old pointer is set to nil.
1496
1497         If the old table doesn't exist, then a new table is created and the new pointer is
1498         set to reference it.
1499
1500         The ME namespace references endpoints which do not need to be released, therefore we
1501         do not need to run that portion of the table to deref like we do for the RTEs.
1502 */
1503 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1504         int counter = 0;
1505         route_table_t*  rt;
1506
1507         if( ctx == NULL ) {
1508                 return NULL;
1509         }
1510
1511         if( (rt = ctx->old_rtable) != NULL ) {
1512                 ctx->old_rtable = NULL;
1513                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1514                         if( counter++ > 1000 ) {
1515                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1516                                 break;
1517                         }
1518
1519                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1520                 }
1521
1522                 if( rt->hash != NULL ) {
1523                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1524                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1525                 }
1526
1527                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1528         } else {
1529                 rt = NULL;
1530         }
1531
1532         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1533         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1534                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1535         } else {
1536                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1537                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1538                 rt->error = 1;
1539         }
1540
1541         return rt;
1542 }
1543
1544
1545 /*
1546         Given a name, find the endpoint struct in the provided route table.
1547 */
1548 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1549
1550         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1551                 return NULL;
1552         }
1553
1554         return rmr_sym_get( rt->ephash, ep_name, 1 );
1555 }
1556
1557 /*
1558         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1559         Does NOT destroy the gate as it's a common gate for ALL route tables.
1560 */
1561 static void uta_rt_drop( route_table_t* rt ) {
1562         if( rt == NULL ) {
1563                 return;
1564         }
1565
1566         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1567         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1568         free( rt );
1569 }
1570
1571 /*
1572         Look up and return the pointer to the endpoint stuct matching the given name.
1573         If not in the hash, a new endpoint is created, added to the hash. Should always
1574         return a pointer.
1575 */
1576 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1577         endpoint_t*     ep;
1578
1579         if( !rt || !ep_name || ! *ep_name ) {
1580                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1581                 errno = EINVAL;
1582                 return NULL;
1583         }
1584
1585         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1586                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1587                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1588                         errno = ENOMEM;
1589                         return NULL;
1590                 }
1591
1592                 ep->notify = 1;                                                         // show notification on first connection failure
1593                 ep->open = 0;                                                           // not connected
1594                 ep->addr = uta_h2ip( ep_name );
1595                 ep->name = strdup( ep_name );
1596                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1597                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1598
1599                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1600         }
1601
1602         return ep;
1603 }
1604
1605
1606 /*
1607         Given a session id and message type build a key that can be used to look up the rte in the route
1608         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1609 */
1610 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1611         uint64_t key;
1612
1613         if( sub_id == UNSET_SUBID ) {
1614                 key = 0xffffffff00000000 | mtype;
1615         } else {
1616                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1617         }
1618
1619         return key;
1620 }
1621
1622 /*
1623         Given a route table and meid string, find the owner (if known). Returns a pointer to
1624         the endpoint struct or nil.
1625 */
1626 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1627         endpoint_t const* ep;           // the ep we found in the hash
1628
1629         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1630                 return NULL;
1631         }
1632
1633         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1634 }
1635
1636 /*
1637         This returns a pointer to the currently active route table and ups
1638         the reference count so that the route table is not freed while it
1639         is being used. The caller MUST call release_rt() when finished
1640         with the pointer.
1641
1642         Care must be taken: the ctx->rtable pointer _could_ change during the time
1643         between the release of the lock and the return. Therefore we MUST grab
1644         the current pointer when we have the lock so that if it does we don't
1645         return a pointer to the wrong table.
1646
1647         This will return NULL if there is no active table.
1648 */
1649 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1650         route_table_t*  rrt;                    // return value
1651
1652         if( ctx == NULL || ctx->rtable == NULL ) {
1653                 return NULL;
1654         }
1655
1656         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1657         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1658         rrt->ref_count++;
1659         pthread_mutex_unlock( ctx->rtgate );
1660
1661         return rrt;                                                                             // pointer we upped the count with
1662 }
1663
1664 /*
1665         This will "release" the route table by reducing the use counter
1666         in the table. The table may not be freed until the counter reaches
1667         0, so it's imparative that the pointer be "released" when it is
1668         fetched by get_rt().  Once the caller has released the table it
1669         may not safely use the pointer that it had.
1670 */
1671 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1672         if( ctx == NULL || rt == NULL ) {
1673                 return;
1674         }
1675
1676         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1677         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1678                 rt->ref_count--;
1679         }
1680         pthread_mutex_unlock( ctx->rtgate );
1681 }
1682 #endif