Fixed some static code errors
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int error;                                      // if a realloc failed, this will be set
61         int nalloc;
62         int nused;
63         void** things;
64         const char** names;
65 } thing_list_t;
66
67 // ---- debugging/testing -------------------------------------------------------------------------
68
69 /*
70         Dump some stats for an endpoint in the RT. This is generally called to
71         verify endpoints after a table load/change.
72
73         This is called by the for-each mechanism of the symtab and the prototype is
74         fixe; we don't really use some of the parms, but have dummy references to
75         keep sonar from complaining.
76 */
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
78         int*    counter;
79         endpoint_t* ep;
80
81         if( (ep = (endpoint_t *) thing) == NULL ) {
82                 return;
83         }
84
85         if( (counter = (int *) vcounter) != NULL ) {
86                 (*counter)++;
87         } else {
88                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
89                 return;
90         }
91
92         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
93 }
94
95 /*
96         Called to count meid entries in the table. The meid points to an 'owning' endpoint
97         so we can list what we find
98
99         See note in ep_stats about dummy refs.
100 */
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
102         int*    counter;
103         endpoint_t* ep;
104
105         if( (ep = (endpoint_t *) thing) == NULL ) {
106                 return;
107         }
108
109         if( (counter = (int *) vcounter) != NULL ) {
110                 (*counter)++;
111         } else {
112                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
113         }
114
115         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
116 }
117
118 /*
119         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120         the 'source' information and is added to each message.
121
122         See note above about dummy references.
123 */
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
125         endpoint_t* ep;
126         char*   id;
127
128         if( (ep = (endpoint_t *) thing) == NULL ) {
129                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
130                 return;
131         }
132
133         if( (id = (char *) vid) == NULL ) {
134                 id = "missing";
135         }
136
137         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138                 (long long) time( NULL ),
139                 id,
140                 ep->name,
141                 ep->open,
142                 ep->scounts[EPSC_GOOD],
143                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144                 ep->scounts[EPSC_FAIL],
145                 ep->scounts[EPSC_TRANS]   );
146 }
147
148 /*
149         Dump stats for a route entry in the table.
150 */
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
152         int*    counter;
153         rtable_ent_t const* rte;                // thing is really an rte
154         int             mtype;
155         int             sid;
156
157         if( (rte = (rtable_ent_t *) thing) == NULL ) {
158                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
159                 return;
160         }
161
162         if( (counter = (int *) vcounter) != NULL ) {
163                 (*counter)++;
164         }
165
166         mtype = rte->key & 0xffff;
167         sid = (int) (rte->key >> 32);
168
169         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
170 }
171
172 /*
173         Given a route table, cause some stats to be spit out.
174 */
175 static void  rt_stats( route_table_t* rt ) {
176         int* counter;
177
178         if( rt == NULL ) {
179                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
180                 return;
181         }
182
183         counter = (int *) malloc( sizeof( int ) );
184         *counter = 0;
185         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
188         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
189
190         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
191         *counter = 0;
192         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
193         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
194
195         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
196         *counter = 0;
197         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
198         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
199
200         free( counter );
201 }
202
203 /*
204         Given a route table, cause endpoint counters to be written to stderr. The id
205         parm is written as the "source" in the output.
206 */
207 static void  rt_epcounts( route_table_t* rt, char* id ) {
208         if( rt == NULL ) {
209                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
210                 return;
211         }
212
213         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
214 }
215
216
217 static void dump_tables( uta_ctx_t *ctx ) {
218         if( ctx->old_rtable != NULL ) {
219                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220                 rt_stats( ctx->old_rtable );
221         } else {
222                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
223         }
224         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225         rt_stats( ctx->rtable );
226 }
227
228 // ------------ route manager communication -------------------------------------------------
229 /*
230         Send a request for a table update to the route manager. Updates come in
231         async, so send and go.
232
233         pctx is the private context for the thread; ctx is the application context
234         that we need to be able to send the application ID in case rt mgr needs to
235         use it to idenfity us.
236
237         Returns 0 if we were not able to send a request.
238 */
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
240         rmr_mbuf_t*     smsg;
241         int     state = 0;
242
243         if( ctx->rtg_whid < 0 ) {
244                 return state;
245         }
246
247         smsg = rmr_alloc_msg( pctx, 1024 );
248         if( smsg != NULL ) {
249                 smsg->mtype = RMRRM_REQ_TABLE;
250                 smsg->sub_id = 0;
251                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253                 smsg->len = strlen( smsg->payload ) + 1;
254
255                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256                 if( (state = smsg->state) != RMR_OK ) {
257                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
259                         ctx->rtg_whid = -1;
260                 }
261
262                 rmr_free_msg( smsg );
263         }
264
265         return state;
266 }
267
268 /*
269         Send an ack to the route table manager for a table ID that we are
270         processing.      State is 1 for OK, and 0 for failed. Reason might
271         be populated if we know why there was a failure.
272
273         Context should be the PRIVATE context that we use for messages
274         to route manger and NOT the user's context.
275
276         If a message buffere is passed we use that and use return to sender
277         assuming that this might be a response to a call and that is needed
278         to send back to the proper calling thread. If msg is nil, we allocate
279         and use it.
280 */
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
282         int             use_rts = 1;
283         int             payload_size = 1024;
284
285         if( ctx == NULL || ctx->rtg_whid < 0 ) {
286                 return;
287         }
288
289         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
290                 return;
291         }
292
293         if( smsg != NULL ) {
294                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
295         } else {
296                 use_rts = 0;
297                 smsg = rmr_alloc_msg( ctx, payload_size );
298         }
299
300         if( smsg != NULL ) {
301                 smsg->mtype = RMRRM_TABLE_STATE;
302                 smsg->sub_id = -1;
303                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
305
306                 smsg->len = strlen( smsg->payload ) + 1;
307
308                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
309                 if( use_rts ) {
310                         smsg = rmr_rts_msg( ctx, smsg );
311                 } else {
312                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
313                 }
314                 if( (state = smsg->state) != RMR_OK ) {
315                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
317                         ctx->rtg_whid = -1;
318                 }
319
320                 if( ! use_rts ) {
321                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
322                 }
323         }
324 }
325
326 // ---- alarm generation --------------------------------------------------------------------------
327
328 /*
329         Given the user's context (not the thread private context) look to see if the application isn't
330         working fast enough and we're dropping messages. If the drop counter has changed since the last
331         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
332         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
333
334         The private context is what we use to send so as not to interfere with the user flow.
335 */
336 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
337         static  int alarm_raised = 0;
338         static  int ok2clear = 0;                                       // time that we can clear
339         static  int lastd = 0;                                          // the last counter value so we can compute delta
340         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
341
342         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
343         if( ! alarm_raised ) {
344                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
345                         return;
346                 }
347
348                 alarm_raised = 1;
349                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
350                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
351         } else {
352                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
353                         lastd = uctx->dcount;
354                         ok2clear = 0;                                                   // reset the timer
355                         return;
356                 }
357
358                 if( ok2clear == 0 ) {                                           // first round where not dropping
359                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
360                 } else {
361                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
362                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
363                                 alarm_raised = 0;
364                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id++, "RMR message dropping has stopped" );
365                         }
366                 }
367         }
368 }
369
370 // ---- utility -----------------------------------------------------------------------------------
371 /*
372         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
373         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
374 */
375 static char* clip( char* buf ) {
376         char*   tok;
377
378         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
379                 buf++;
380         }
381
382         if( (tok = strchr( buf, '#' )) != NULL ) {
383                 if( tok == buf ) {
384                         return buf;                                     // just push back; leading comment sym handled there
385                 }
386
387                 if( isspace( *(tok-1) ) ) {
388                         *tok = 0;
389                 }
390         }
391
392         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
393         *(tok+1) = 0;
394
395         return buf;
396 }
397
398 /*
399         This accepts a pointer to a nil terminated string, and ensures that there is a
400         newline as the last character. If there is not, a new buffer is allocated and
401         the newline is added.  If a new buffer is allocated, the buffer passed in is
402         freed.  The function returns a pointer which the caller should use, and must
403         free.  In the event of an error, a nil pointer is returned.
404 */
405 static char* ensure_nlterm( char* buf ) {
406         char*   nb = NULL;
407         int             len = 0;
408
409         if( buf != NULL ) {
410                 len = strlen( buf );
411         }
412
413         nb = buf;                                                       // default to returning original as is
414         switch( len ) {
415                 case 0:
416                         nb = strdup( "\n" );
417                         break;
418
419                 case 1:
420                         if( *buf != '\n' ) {            // not a newline; realloc
421                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
422                                 nb = strdup( " \n" );
423                                 *nb = *buf;
424                                 free( buf );
425                         }
426                         break;
427
428                 default:
429                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
430                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
431                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
432                                         memcpy( nb, buf, len );
433                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
434                                         *(nb+len+1) = 0;
435                                         free( buf );
436                                 }
437                         }
438                         break;
439         }
440
441         return nb;
442 }
443
444 /*
445         Roll the new table into the active and the active into the old table. We
446         must have the lock on the active table to do this. It's possible that there
447         is no active table (first load), so we have to account for that (no locking).
448 */
449 static void roll_tables( uta_ctx_t* ctx ) {
450
451         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
452                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
453                 ctx->old_rtable = ctx->new_rtable;
454                 ctx->new_rtable = NULL;
455                 return;
456         }
457
458         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
459                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
460                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
461                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
462                 pthread_mutex_unlock( ctx->rtgate );
463         } else {
464                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
465                 ctx->rtable = ctx->new_rtable;                          // make new the active one
466         }
467
468         ctx->new_rtable = NULL;
469 }
470
471 /*
472         Given a thing list, extend the array of pointers by 1/2 of the current
473         number allocated. If we cannot realloc an array, then we set the error
474         flag.  Unlikely, but will prevent a crash, AND will prevent us from
475         trying to use the table since we couldn't capture everything.
476 */
477 static void extend_things( thing_list_t* tl ) {
478         int old_alloc;
479         void*   old_things;
480         void*   old_names;
481
482         if( tl == NULL ) {
483                 return;
484         }
485
486         old_alloc = tl->nalloc;                         // capture current things
487         old_things = tl->things;
488         old_names = tl->names;
489
490         tl->nalloc += tl->nalloc/2;                     // new allocation size
491
492         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
493         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
494
495         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
496                 tl->error = 1;
497                 return;
498         }
499
500         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
501         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
502
503         free( old_things );
504         free( old_names );
505 }
506
507 // ------------ entry update functions ---------------------------------------------------------------
508 /*
509         Given a message type create a route table entry and add to the hash keyed on the
510         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
511         is the number of group slots to allocate in the entry.
512 */
513 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
514         rtable_ent_t* rte;
515         rtable_ent_t* old_rte;          // entry which was already in the table for the key
516
517         if( rt == NULL ) {
518                 return NULL;
519         }
520
521         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
522                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
523                 return NULL;
524         }
525         memset( rte, 0, sizeof( *rte ) );
526         rte->refs = 1;
527         rte->key = key;
528
529         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
530                 nrrgroups = 10;
531         }
532
533         if( nrrgroups ) {
534                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
535                         free( rte );
536                         return NULL;
537                 }
538                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
539         } else {
540                 rte->rrgroups = NULL;
541         }
542
543         rte->nrrgroups = nrrgroups;
544
545         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
546                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
547         }
548
549         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
550
551         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
552         return rte;
553 }
554
555 /*
556         This accepts partially parsed information from an rte or mse record sent by route manager or read from
557         a file such that:
558                 ts_field is the msg-type,sender field
559                 subid is the integer subscription id
560                 rr_field is the endpoint information for round robening message over
561
562         If all goes well, this will add an RTE to the table under construction.
563
564         The ts_field is checked to see if we should ingest this record. We ingest if one of
565         these is true:
566                 there is no sender info (a generic entry for all)
567                 there is sender and our host:port matches one of the senders
568                 the sender info is an IP address that matches one of our IP addresses
569 */
570 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
571         rtable_ent_t*   rte;            // route table entry added
572         char const*     tok;
573         int             ntoks;
574         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
575         char*   tokens[128];
576         char*   gtokens[64];
577         int             i;
578         int             ngtoks;                         // number of tokens in the group list
579         int             grp;                            // index into group list
580         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
581         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
582
583         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
584         rr_field = clip( rr_field );
585
586         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
587                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
588                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
589
590                 key = build_rt_key( subid, atoi( ts_field ) );
591
592                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
593
594                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
595                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
596                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
597                         }
598                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
599                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
600
601                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
602                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
603                                         for( i = 0; i < ntoks; i++ ) {
604                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
605                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
606                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
607                                                         has_ep = TRUE;
608                                                 }
609                                         }
610                                         if( has_ep ) {
611                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
612                                                 has_ep = FALSE;
613                                         }
614                                 }
615                         }
616                 }
617         } else {
618                 if( DEBUG || (vlevel > 2) ) {
619                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
620                 }
621         }
622 }
623
624 /*
625         Trash_entry takes a partially parsed record from the input and
626         will delete the entry if the sender,mtype matches us or it's a
627         generic mtype. The refernce in the new table is removed and the
628         refcounter for the actual rte is decreased. If that ref count is
629         0 then the memory is freed (handled byh the del_rte call).
630 */
631 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
632         rtable_ent_t*   rte;            // route table entry to be 'deleted'
633         char const*     tok;
634         int             ntoks;
635         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
636         char*   tokens[128];
637
638         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
639                 return;
640         }
641
642         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
643
644         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
645                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
646                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
647
648                 key = build_rt_key( subid, atoi( ts_field ) );
649                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
650                 if( rte != NULL ) {
651                         if( DEBUG || (vlevel > 1) ) {
652                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
653                         }
654                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
655                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
656                 } else {
657                         if( DEBUG || (vlevel > 1) ) {
658                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
659                         }
660                 }
661         } else {
662                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
663         }
664 }
665
666 // -------------------------- parse functions --------------------------------------------------
667
668 /*
669         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
670         the 'owner' which should be the dns name or IP address of an enpoint
671         the meid_list is a space separated list of me IDs
672
673         This function assumes the caller has vetted the pointers as needed.
674
675         For each meid in the list, an entry is pushed into the hash which references the owner
676         endpoint such that when the meid is used to route a message it references the endpoint
677         to send messages to.
678 */
679 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
680         char const*     tok;
681         int             ntoks;
682         char*   tokens[128];
683         int             i;
684         int             state;
685         endpoint_t*     ep;                                             // endpoint struct for the owner
686
687         owner = clip( owner );                          // ditch extra whitespace and trailing comments
688         meid_list = clip( meid_list );
689
690         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
691         for( i = 0; i < ntoks; i++ ) {
692                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
693                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
694                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
695                 } else {
696                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
697                 }
698         }
699 }
700
701 /*
702         Given the tokens from an mme_del, delete the listed meid entries from the new
703         table. The list is a space separated list of meids.
704
705         The meids in the hash reference endpoints which are never deleted and so
706         the only thing that we need to do here is to remove the meid from the hash.
707
708         This function assumes the caller has vetted the pointers as needed.
709 */
710 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
711         char const*     tok;
712         int             ntoks;
713         char*   tokens[128];
714         int             i;
715
716         if( rtab->hash == NULL ) {
717                 return;
718         }
719
720         meid_list = clip( meid_list );
721
722         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
723         for( i = 0; i < ntoks; i++ ) {
724                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
725                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
726         }
727 }
728
729 /*
730         Parse a partially parsed meid record. Tokens[0] should be one of:
731                 meid_map, mme_ar, mme_del.
732
733         pctx is the private context needed to return an ack/nack using the provided
734         message buffer with the route managers address info.
735 */
736 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
737         char wbuf[1024];
738
739         if( tokens == NULL || ntoks < 1 ) {
740                 return;                                                 // silent but should never happen
741         }
742
743         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
744                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
745                 return;
746         }
747
748         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
749                 tokens[1] = clip( tokens[1] );
750                 if( *(tokens[1]) == 's' ) {
751                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
752                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
753                                 uta_rt_drop( ctx->new_rtable );
754                                 ctx->new_rtable = NULL;
755                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
756                         }
757
758                         if( ctx->table_id != NULL ) {
759                                 free( ctx->table_id );
760                         }
761                         if( ntoks > 2 ) {
762                                 ctx->table_id = strdup( clip( tokens[2] ) );
763                         } else {
764                                 ctx->table_id = NULL;
765                         }
766
767                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
768                         ctx->new_rtable->mupdates = 0;
769
770                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
771                 } else {
772                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
773                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
774                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
775                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
776                                                                 ctx->new_rtable->mupdates, tokens[2] );
777                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
778                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
779                                                 uta_rt_drop( ctx->new_rtable );
780                                                 ctx->new_rtable = NULL;
781                                                 return;
782                                         }
783
784                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
785                                 }
786
787                                 if( ctx->new_rtable ) {
788                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
789                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
790                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
791
792                                         if( vlevel > 0 ) {
793                                                 if( ctx->old_rtable != NULL ) {
794                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
795                                                         rt_stats( ctx->old_rtable );
796                                                 } else {
797                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
798                                                 }
799                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
800                                                 rt_stats( ctx->rtable );
801                                         }
802                                 } else {
803                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
804                                         ctx->new_rtable = NULL;
805                                 }
806                         }
807                 }
808
809                 return;
810         }
811
812         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
813                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
814                 return;
815         }
816
817         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
818                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
819                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
820                         return;
821                 }
822                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
823                 ctx->new_rtable->mupdates++;
824                 return;
825         }
826
827         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
828                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
829                 ctx->new_rtable->mupdates++;
830                 return;
831         }
832 }
833
834 /*
835         This will close the current table snarf file (in *.inc) and open a new one.
836         The curent one is renamed. The final file name is determined by the setting of
837         RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
838         an additional extension of .snarf.  If neither seed or snarf environment vars are
839         set then this does nothing.
840
841         If this is called before the tmp snarf file is opened, then this just opens the file.
842 */
843 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
844         static int              ok2warn = 0;    // some warnings squelched on first call
845
846         char*   seed_fname;                             // the filename from env
847         char    tfname[512];                    // temp fname
848         char    wfname[512];                    // working buffer for filename
849         char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
850
851         if( ctx == NULL ) {
852                 return;
853         }
854
855         if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                 // specific place to stash the rt not given
856                 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
857                         memset( wfname, 0, sizeof( wfname ) );
858                         snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
859                         snarf_fname = wfname;
860                 }
861         }
862
863         if( snarf_fname == NULL ) {
864                 return;
865         }
866
867         memset( tfname, 0, sizeof( tfname ) );
868         snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
869
870         if( ctx->snarf_rt_fd >= 0 ) {
871                 char* msg= "### captured from route manager\n";
872                 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
873                 if( close( ctx->snarf_rt_fd ) < 0 ) {
874                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
875                         return;
876                 }
877
878                 if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
879                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
880                 }
881
882                 if( rename( tfname, snarf_fname ) ) {
883                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
884                 } else {
885                         rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
886                 }
887         }
888         ok2warn = 1;
889
890         ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
891         if( ctx->snarf_rt_fd < 0 ) {
892                 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
893         } else {
894                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
895         }
896 }
897
898 /*
899         Parse a single record recevied from the route table generator, or read
900         from a static route table file.  Start records cause a new table to
901         be started (if a partial table was received it is discarded. Table
902         entry records are added to the currenly 'in progress' table, and an
903         end record causes the in progress table to be finalised and the
904         currently active table is replaced.
905
906         The updated table will be activated when the *|end record is encountered.
907         However, to allow for a "double" update, where both the meid map and the
908         route table must be updated at the same time, the end indication on a
909         route table (new or update) may specifiy "hold" which indicates that meid
910         map entries are to follow and the updated route table should be held as
911         pending until the end of the meid map is received and validated.
912
913         CAUTION:  we are assuming that there is a single route/meid map generator
914                 and as such only one type of update is received at a time; in other
915                 words, the sender cannot mix update records and if there is more than
916                 one sender process they must synchronise to avoid issues.
917
918
919         For a RT update, we expect:
920                 newrt | start | <table-id>
921                 newrt | end | <count>
922                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
923                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
924                 mse| <mtype>[,sender] | <sub-id> | %meid
925
926
927         For a meid map update we expect:
928                 meid_map | start | <table-id>
929                 meid_map | end | <count> | <md5-hash>
930                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
931                 mme_del | <meid0> <meid1>...<meidn>
932
933
934         The pctx is our private context that must be used to send acks/status
935         messages back to the route manager.  The regular ctx is the ctx that
936         the user has been given and thus that's where we have to hang the route
937         table we're working with.
938
939         If mbuf is given, and we need to ack, then we ack using the mbuf and a
940         return to sender call (allows route manager to use wh_call() to send
941         an update and rts is required to get that back to the right thread).
942         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
943         will be used.
944 */
945 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
946         int i;
947         int ntoks;                                                      // number of tokens found in something
948         int ngtoks;
949         int     grp;                                                    // group number
950         rtable_ent_t const*     rte;                    // route table entry added
951         char*   tokens[128];
952         char*   tok;                                            // pointer into a token or string
953         char    wbuf[1024];
954
955         if( ! buf ) {
956                 return;
957         }
958
959         if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
960                 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
961                 write( ctx->snarf_rt_fd, "\n", 1 );
962         }
963
964         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
965                 buf++;
966         }
967         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
968         *(tok+1) = 0;
969
970         memset( tokens, 0, sizeof( tokens ) );
971         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
972                 tokens[0] = clip( tokens[0] );
973                 switch( *(tokens[0]) ) {
974                         case 0:                                                                                                 // ignore blanks
975                                 // fallthrough
976                         case '#':                                                                                               // and comment lines
977                                 break;
978
979                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
980                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
981                                         break;
982                                 }
983
984                                 if( ntoks < 3 ) {
985                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
986                                         break;
987                                 }
988
989                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
990                                 ctx->new_rtable->updates++;
991                                 break;
992
993                         case 'n':                                                                                               // newrt|{start|end}
994                                 tokens[1] = clip( tokens[1] );
995                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
996                                         if( ctx && ctx->snarf_rt_fd >= 0 ) {
997                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
998                                         }
999
1000                                         if( ntoks >2 ) {
1001                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1002                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1003                                                                 ctx->new_rtable->updates, tokens[2] );
1004                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1005                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1006                                                         uta_rt_drop( ctx->new_rtable );
1007                                                         ctx->new_rtable = NULL;
1008                                                         break;
1009                                                 }
1010                                         }
1011
1012                                         if( ctx->new_rtable ) {
1013                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1014                                                 if( DEBUG > 1 || (vlevel > 1) ) {
1015                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1016                                                         dump_tables( ctx );
1017                                                 }
1018
1019                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1020                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1021                                         } else {
1022                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1023                                                 ctx->new_rtable = NULL;
1024                                         }
1025                                 } else {                                                                                                                        // start a new table.
1026                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
1027                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1028
1029                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1030                                                 uta_rt_drop( ctx->new_rtable );
1031                                                 ctx->new_rtable = NULL;
1032                                         }
1033
1034                                         if( ctx->table_id != NULL ) {
1035                                                 free( ctx->table_id );
1036                                         }
1037                                         if( ntoks >2 ) {
1038                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1039                                         } else {
1040                                                 ctx->table_id = NULL;
1041                                         }
1042
1043                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
1044                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
1045
1046                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1047                                 }
1048                                 break;
1049
1050                         case 'm':                                                                       // mse entry or one of the meid_ records
1051                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
1052                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1053                                                 break;
1054                                         }
1055
1056                                         if( ntoks < 4 ) {
1057                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1058                                                 break;
1059                                         }
1060
1061                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1062                                         ctx->new_rtable->updates++;
1063                                 } else {
1064                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1065                                 }
1066                                 break;
1067
1068                         case 'r':                                       // assume rt entry
1069                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1070                                         break;
1071                                 }
1072
1073                                 ctx->new_rtable->updates++;
1074                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1075                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1076                                 } else {
1077                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1078                                 }
1079                                 break;
1080
1081                         case 'u':                                                                                               // update current table, not a total replacement
1082                                 tokens[1] = clip( tokens[1] );
1083                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1084                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1085                                                 break;
1086                                         }
1087                                         if( ctx->snarf_rt_fd >= 0 ) {
1088                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1089                                         }
1090
1091                                         if( ntoks >2 ) {
1092                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1093                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1094                                                                 ctx->new_rtable->updates, tokens[2] );
1095                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1096                                                         uta_rt_drop( ctx->new_rtable );
1097                                                         ctx->new_rtable = NULL;
1098                                                         break;
1099                                                 }
1100                                         }
1101
1102                                         if( ctx->new_rtable ) {
1103                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1104                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1105                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1106                                                         dump_tables( ctx );
1107                                                 }
1108
1109                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1110                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1111                                         } else {
1112                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1113                                                 ctx->new_rtable = NULL;
1114                                         }
1115                                 } else {                                                                                        // start a new table.
1116                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1117                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1118                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1119                                                 uta_rt_drop( ctx->new_rtable );
1120                                                 ctx->new_rtable = NULL;
1121                                         }
1122
1123                                         if( ntoks > 2 ) {
1124                                                 if( ctx->table_id != NULL ) {
1125                                                         free( ctx->table_id );
1126                                                 }
1127                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1128                                         }
1129
1130                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1131                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1132
1133                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1134                                 }
1135                                 break;
1136
1137                         default:
1138                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1139                                 break;
1140                 }
1141         }
1142 }
1143
1144 /*
1145         This function attempts to open a static route table in order to create a 'seed'
1146         table during initialisation.  The environment variable RMR_SEED_RT is expected
1147         to contain the necessary path to the file. If missing, or if the file is empty,
1148         no route table will be available until one is received from the generator.
1149
1150         This function is probably most useful for testing situations, or extreme
1151         cases where the routes are static.
1152 */
1153 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1154         int             i;
1155         char*   fname;
1156         char*   fbuf;                           // buffer with file contents
1157         char*   rec;                            // start of the record
1158         char*   eor;                            // end of the record
1159         int             rcount = 0;                     // record count for debug
1160
1161         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1162                 return;
1163         }
1164
1165         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1166                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1167                 return;
1168         }
1169
1170         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1171         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1172                 if( *eor == '\r' ) {
1173                         *eor = '\n';                                                            // will look like a blank line which is ok
1174                 }
1175         }
1176
1177         rec = fbuf;
1178         while( rec && *rec ) {
1179                 rcount++;
1180                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1181                         *eor = 0;
1182                 } else {
1183                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1184                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1185                         free( fbuf );
1186                         return;
1187                 }
1188
1189                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1190
1191                 rec = eor+1;
1192         }
1193
1194         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1195         free( fbuf );
1196 }
1197
1198 /*
1199         Callback driven for each thing in a symtab. We collect the pointers to those
1200         things for later use (cloning).
1201 */
1202 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1203         thing_list_t*   tl;
1204
1205         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1206                 return;
1207         }
1208
1209         if( thing == NULL ) {
1210                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1211                 return;
1212         }
1213
1214         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1215         tl->things[tl->nused++] = thing;                // save a reference to the thing
1216
1217         if( tl->nused >= tl->nalloc ) {
1218                 extend_things( tl );                            // not enough allocated
1219         }
1220 }
1221
1222 /*
1223         Called to delete a route table entry struct. We delete the array of endpoint
1224         pointers, but NOT the endpoints referenced as those are referenced from
1225         multiple entries.
1226
1227         Route table entries can be concurrently referenced by multiple symtabs, so
1228         the actual delete happens only if decrementing the rte's ref count takes it
1229         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1230         the symtab, or overlaying an entry.
1231
1232         This function uses ONLY the pointer to the rte (thing) and ignores the other
1233         information that symtab foreach function passes (st, entry, and data) which
1234         means that it _can_ safetly be used outside of the foreach setting. If
1235         the function is changed to depend on any of these three, then a stand-alone
1236         rte_cleanup() function should be added and referenced by this, and refererences
1237         to this outside of the foreach world should be changed.
1238 */
1239 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1240         rtable_ent_t*   rte;
1241         int i;
1242
1243         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1244                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1245                 return;
1246         }
1247
1248         rte->refs--;
1249         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1250                 return;
1251         }
1252
1253         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1254                 for( i = 0; i < rte->nrrgroups; i++ ) {
1255                         if( rte->rrgroups[i] ) {
1256                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1257                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1258                         }
1259
1260                 }
1261
1262                 free( rte->rrgroups );
1263         }
1264
1265         free( rte );                                                                                    // finally, drop the potato
1266 }
1267
1268 /*
1269         Read an entire file into a buffer. We assume for route table files
1270         they will be smallish and so this won't be a problem.
1271         Returns a pointer to the buffer, or nil. Caller must free.
1272         Terminates the buffer with a nil character for string processing.
1273
1274         If we cannot stat the file, we assume it's empty or missing and return
1275         an empty buffer, as opposed to a nil, so the caller can generate defaults
1276         or error if an empty/missing file isn't tolerated.
1277 */
1278 static char* uta_fib( char const* fname ) {
1279         struct stat     stats;
1280         off_t           fsize = 8192;   // size of the file
1281         off_t           nread;                  // number of bytes read
1282         int                     fd;
1283         char*           buf;                    // input buffer
1284
1285         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1286                 if( fstat( fd, &stats ) >= 0 ) {
1287                         if( stats.st_size <= 0 ) {                                      // empty file
1288                                 close( fd );
1289                                 fd = -1;
1290                         } else {
1291                                 fsize = stats.st_size;                                          // stat ok, save the file size
1292                         }
1293                 } else {
1294                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1295                 }
1296         }
1297
1298         if( fd < 0 ) {                                                                                  // didn't open or empty
1299                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1300                         return NULL;
1301                 }
1302
1303                 *buf = 0;
1304                 return buf;
1305         }
1306
1307         // add a size limit check here
1308
1309         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1310                 close( fd );
1311                 errno = ENOMEM;
1312                 return NULL;
1313         }
1314
1315         nread = read( fd, buf, fsize );
1316         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1317                 free( buf );
1318                 errno = EFBIG;                                                                                  // likely too much to handle
1319                 close( fd );
1320                 return NULL;
1321         }
1322
1323         buf[nread] = 0;
1324
1325         close( fd );
1326         return buf;
1327 }
1328
1329 // --------------------- initialisation/creation ---------------------------------------------
1330 /*
1331         Create and initialise a route table; Returns a pointer to the table struct.
1332 */
1333 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1334         route_table_t*  rt;
1335
1336         if( ctx == NULL ) {
1337                 return NULL;
1338         }
1339         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1340                 return NULL;
1341         }
1342
1343         memset( rt, 0, sizeof( *rt ) );
1344
1345         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1346                 free( rt );
1347                 return NULL;
1348         }
1349
1350         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1351         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1352         pthread_mutex_init( rt->gate, NULL );
1353
1354         return rt;
1355 }
1356
1357 /*
1358         Clones one of the spaces in the given table.
1359         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1360         Space is the space in the old table to copy. Space 0 uses an integer key and
1361         references rte structs. All other spaces use a string key and reference endpoints.
1362 */
1363 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1364         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1365         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1366         void*   sst;                    // source symtab
1367         void*   nst;                    // new symtab
1368         thing_list_t things;    // things from the space to copy
1369         int             i;
1370         int             free_on_err = 0;
1371
1372         if( ctx == NULL ) {
1373                 return NULL;
1374         }
1375         if( nrt == NULL ) {                             // make a new table if needed
1376                 free_on_err = 1;
1377                 nrt = uta_rt_init( ctx );
1378                 if( nrt == NULL ) {
1379                         return NULL;
1380                 }
1381         }
1382
1383         if( srt == NULL ) {             // source was nil, just give back the new table
1384                 return nrt;
1385         }
1386
1387         things.nalloc = 2048;
1388         things.nused = 0;
1389         things.error = 0;
1390         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1391         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1392         if( things.things == NULL || things.names == NULL ){
1393                 if( things.things != NULL ) {
1394                         free( things.things );
1395                 }
1396                 if( things.names != NULL ) {
1397                         free( things.names );
1398                 }
1399
1400                 if( free_on_err ) {
1401                         rmr_sym_free( nrt->hash );
1402                         free( nrt );
1403                         nrt = NULL;
1404                 } else {
1405                         nrt->error = 1;
1406                 }
1407
1408                 return nrt;
1409         }
1410         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1411         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1412
1413         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1414         nst = nrt->hash;
1415
1416         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1417         if( things.error ) {                            // something happened and capture failed
1418                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1419                 free( things.things );
1420                 free( things.names );
1421                 if( free_on_err ) {
1422                         rmr_sym_free( nrt->hash );
1423                         free( nrt );
1424                         nrt = NULL;
1425                 } else {
1426                         nrt->error = 1;
1427                 }
1428                 return nrt;
1429         }
1430
1431         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1432         for( i = 0; i < things.nused; i++ ) {
1433                 if( space ) {                                                                                           // string key, epoint reference
1434                         ep = (endpoint_t *) things.things[i];
1435                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1436                 } else {
1437                         rte = (rtable_ent_t *) things.things[i];
1438                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1439                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1440                 }
1441         }
1442
1443         free( things.things );
1444         free( (void *) things.names );
1445         return nrt;
1446 }
1447
1448 /*
1449         Given a destination route table (drt), clone from the source (srt) into it.
1450         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1451         allocate the drt if that was nil too). If all is true (1), then we will clone both
1452         the MT and the ME spaces; otherwise only the ME space is cloned.
1453 */
1454 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1455         endpoint_t*             ep;                             // an endpoint
1456         rtable_ent_t*   rte;                    // a route table entry
1457         int i;
1458
1459         if( ctx == NULL ) {
1460                 return NULL;
1461         }
1462         if( drt == NULL ) {
1463                 drt = uta_rt_init( ctx );
1464         }
1465         if( srt == NULL ) {
1466                 return drt;
1467         }
1468
1469         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1470         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1471         if( all ) {
1472                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1473         }
1474
1475         return drt;
1476 }
1477
1478 /*
1479         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1480         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1481         context to be referenced by the new pointer; the old pointer is set to nil.
1482
1483         If the old table doesn't exist, then a new table is created and the new pointer is
1484         set to reference it.
1485
1486         The ME namespace references endpoints which do not need to be released, therefore we
1487         do not need to run that portion of the table to deref like we do for the RTEs.
1488 */
1489 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1490         int counter = 0;
1491         route_table_t*  rt;
1492
1493         if( ctx == NULL ) {
1494                 return NULL;
1495         }
1496
1497         if( (rt = ctx->old_rtable) != NULL ) {
1498                 ctx->old_rtable = NULL;
1499                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1500                         if( counter++ > 1000 ) {
1501                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1502                                 break;
1503                         }
1504
1505                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1506                 }
1507
1508                 if( rt->hash != NULL ) {
1509                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1510                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1511                 }
1512
1513                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1514         } else {
1515                 rt = NULL;
1516         }
1517
1518         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1519         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1520                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1521         } else {
1522                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1523                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1524                 rt->error = 1;
1525         }
1526
1527         return rt;
1528 }
1529
1530
1531 /*
1532         Given a name, find the endpoint struct in the provided route table.
1533 */
1534 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1535
1536         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1537                 return NULL;
1538         }
1539
1540         return rmr_sym_get( rt->ephash, ep_name, 1 );
1541 }
1542
1543 /*
1544         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1545         Does NOT destroy the gate as it's a common gate for ALL route tables.
1546 */
1547 static void uta_rt_drop( route_table_t* rt ) {
1548         if( rt == NULL ) {
1549                 return;
1550         }
1551
1552         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1553         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1554         free( rt );
1555 }
1556
1557 /*
1558         Look up and return the pointer to the endpoint stuct matching the given name.
1559         If not in the hash, a new endpoint is created, added to the hash. Should always
1560         return a pointer.
1561 */
1562 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1563         endpoint_t*     ep;
1564
1565         if( !rt || !ep_name || ! *ep_name ) {
1566                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1567                 errno = EINVAL;
1568                 return NULL;
1569         }
1570
1571         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1572                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1573                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1574                         errno = ENOMEM;
1575                         return NULL;
1576                 }
1577
1578                 ep->notify = 1;                                                         // show notification on first connection failure
1579                 ep->open = 0;                                                           // not connected
1580                 ep->addr = uta_h2ip( ep_name );
1581                 ep->name = strdup( ep_name );
1582                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1583                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1584
1585                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1586         }
1587
1588         return ep;
1589 }
1590
1591
1592 /*
1593         Given a session id and message type build a key that can be used to look up the rte in the route
1594         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1595 */
1596 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1597         uint64_t key;
1598
1599         if( sub_id == UNSET_SUBID ) {
1600                 key = 0xffffffff00000000 | mtype;
1601         } else {
1602                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1603         }
1604
1605         return key;
1606 }
1607
1608 /*
1609         Given a route table and meid string, find the owner (if known). Returns a pointer to
1610         the endpoint struct or nil.
1611 */
1612 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1613         endpoint_t const* ep;           // the ep we found in the hash
1614
1615         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1616                 return NULL;
1617         }
1618
1619         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1620 }
1621
1622 /*
1623         This returns a pointer to the currently active route table and ups
1624         the reference count so that the route table is not freed while it
1625         is being used. The caller MUST call release_rt() when finished
1626         with the pointer.
1627
1628         Care must be taken: the ctx->rtable pointer _could_ change during the time
1629         between the release of the lock and the return. Therefore we MUST grab
1630         the current pointer when we have the lock so that if it does we don't
1631         return a pointer to the wrong table.
1632
1633         This will return NULL if there is no active table.
1634 */
1635 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1636         route_table_t*  rrt;                    // return value
1637
1638         if( ctx == NULL || ctx->rtable == NULL ) {
1639                 return NULL;
1640         }
1641
1642         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1643         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1644         rrt->ref_count++;
1645         pthread_mutex_unlock( ctx->rtgate );
1646
1647         return rrt;                                                                             // pointer we upped the count with
1648 }
1649
1650 /*
1651         This will "release" the route table by reducing the use counter
1652         in the table. The table may not be freed until the counter reaches
1653         0, so it's imparative that the pointer be "released" when it is
1654         fetched by get_rt().  Once the caller has released the table it
1655         may not safely use the pointer that it had.
1656 */
1657 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1658         if( ctx == NULL || rt == NULL ) {
1659                 return;
1660         }
1661
1662         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1663         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1664                 rt->ref_count--;
1665         }
1666         pthread_mutex_unlock( ctx->rtgate );
1667 }
1668 #endif