Add alarm generation when application is slow
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int error;                                      // if a realloc failed, this will be set
61         int nalloc;
62         int nused;
63         void** things;
64         const char** names;
65 } thing_list_t;
66
67 // ---- debugging/testing -------------------------------------------------------------------------
68
69 /*
70         Dump some stats for an endpoint in the RT. This is generally called to
71         verify endpoints after a table load/change.
72
73         This is called by the for-each mechanism of the symtab and the prototype is
74         fixe; we don't really use some of the parms, but have dummy references to
75         keep sonar from complaining.
76 */
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
78         int*    counter;
79         endpoint_t* ep;
80
81         if( (ep = (endpoint_t *) thing) == NULL ) {
82                 return;
83         }
84
85         if( (counter = (int *) vcounter) != NULL ) {
86                 (*counter)++;
87         } else {
88                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
89                 return;
90         }
91
92         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
93 }
94
95 /*
96         Called to count meid entries in the table. The meid points to an 'owning' endpoint
97         so we can list what we find
98
99         See note in ep_stats about dummy refs.
100 */
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
102         int*    counter;
103         endpoint_t* ep;
104
105         if( (ep = (endpoint_t *) thing) == NULL ) {
106                 return;
107         }
108
109         if( (counter = (int *) vcounter) != NULL ) {
110                 (*counter)++;
111         } else {
112                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
113         }
114
115         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
116 }
117
118 /*
119         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120         the 'source' information and is added to each message.
121
122         See note above about dummy references.
123 */
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
125         endpoint_t* ep;
126         char*   id;
127
128         if( (ep = (endpoint_t *) thing) == NULL ) {
129                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
130                 return;
131         }
132
133         if( (id = (char *) vid) == NULL ) {
134                 id = "missing";
135         }
136
137         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138                 (long long) time( NULL ),
139                 id,
140                 ep->name,
141                 ep->open,
142                 ep->scounts[EPSC_GOOD],
143                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144                 ep->scounts[EPSC_FAIL],
145                 ep->scounts[EPSC_TRANS]   );
146 }
147
148 /*
149         Dump stats for a route entry in the table.
150 */
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
152         int*    counter;
153         rtable_ent_t const* rte;                // thing is really an rte
154         int             mtype;
155         int             sid;
156
157         if( (rte = (rtable_ent_t *) thing) == NULL ) {
158                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
159                 return;
160         }
161
162         if( (counter = (int *) vcounter) != NULL ) {
163                 (*counter)++;
164         }
165
166         mtype = rte->key & 0xffff;
167         sid = (int) (rte->key >> 32);
168
169         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
170 }
171
172 /*
173         Given a route table, cause some stats to be spit out.
174 */
175 static void  rt_stats( route_table_t* rt ) {
176         int* counter;
177
178         if( rt == NULL ) {
179                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
180                 return;
181         }
182
183         counter = (int *) malloc( sizeof( int ) );
184         *counter = 0;
185         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
188         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
189
190         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
191         *counter = 0;
192         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
193         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
194
195         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
196         *counter = 0;
197         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
198         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
199
200         free( counter );
201 }
202
203 /*
204         Given a route table, cause endpoint counters to be written to stderr. The id
205         parm is written as the "source" in the output.
206 */
207 static void  rt_epcounts( route_table_t* rt, char* id ) {
208         if( rt == NULL ) {
209                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
210                 return;
211         }
212
213         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
214 }
215
216
217 static void dump_tables( uta_ctx_t *ctx ) {
218         if( ctx->old_rtable != NULL ) {
219                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220                 rt_stats( ctx->old_rtable );
221         } else {
222                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
223         }
224         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225         rt_stats( ctx->rtable );
226 }
227
228 // ------------ route manager communication -------------------------------------------------
229 /*
230         Send a request for a table update to the route manager. Updates come in
231         async, so send and go.
232
233         pctx is the private context for the thread; ctx is the application context
234         that we need to be able to send the application ID in case rt mgr needs to
235         use it to idenfity us.
236
237         Returns 0 if we were not able to send a request.
238 */
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
240         rmr_mbuf_t*     smsg;
241         int     state = 0;
242
243         if( ctx->rtg_whid < 0 ) {
244                 return state;
245         }
246
247         smsg = rmr_alloc_msg( pctx, 1024 );
248         if( smsg != NULL ) {
249                 smsg->mtype = RMRRM_REQ_TABLE;
250                 smsg->sub_id = 0;
251                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253                 smsg->len = strlen( smsg->payload ) + 1;
254
255                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256                 if( (state = smsg->state) != RMR_OK ) {
257                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
259                         ctx->rtg_whid = -1;
260                 }
261
262                 rmr_free_msg( smsg );
263         }
264
265         return state;
266 }
267
268 /*
269         Send an ack to the route table manager for a table ID that we are
270         processing.      State is 1 for OK, and 0 for failed. Reason might
271         be populated if we know why there was a failure.
272
273         Context should be the PRIVATE context that we use for messages
274         to route manger and NOT the user's context.
275
276         If a message buffere is passed we use that and use return to sender
277         assuming that this might be a response to a call and that is needed
278         to send back to the proper calling thread. If msg is nil, we allocate
279         and use it.
280 */
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
282         int             use_rts = 1;
283         int             payload_size = 1024;
284
285         if( ctx == NULL || ctx->rtg_whid < 0 ) {
286                 return;
287         }
288
289         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
290                 return;
291         }
292
293         if( smsg != NULL ) {
294                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
295         } else {
296                 use_rts = 0;
297                 smsg = rmr_alloc_msg( ctx, payload_size );
298         }
299
300         if( smsg != NULL ) {
301                 smsg->mtype = RMRRM_TABLE_STATE;
302                 smsg->sub_id = -1;
303                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
305
306                 smsg->len = strlen( smsg->payload ) + 1;
307
308                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
309                 if( use_rts ) {
310                         smsg = rmr_rts_msg( ctx, smsg );
311                 } else {
312                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
313                 }
314                 if( (state = smsg->state) != RMR_OK ) {
315                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
317                         ctx->rtg_whid = -1;
318                 }
319
320                 if( ! use_rts ) {
321                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
322                 }
323         }
324 }
325
326 // ---- alarm generation --------------------------------------------------------------------------
327
328 /*
329         Given the user's context (not the thread private context) look to see if the application isn't
330         working fast enough and we're dropping messages. If the drop counter has changed since the last
331         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
332         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
333
334         The private context is what we use to send so as not to interfere with the user flow.
335 */
336 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
337         static  int alarm_raised = 0;
338         static  int ok2clear = 0;                                       // time that we can clear
339         static  int lastd = 0;                                          // the last counter value so we can compute delta
340         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
341
342         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
343         if( ! alarm_raised ) {
344                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
345                         return;
346                 }
347
348                 alarm_raised = 1;
349                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
350                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
351         } else {
352                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
353                         lastd = uctx->dcount;
354                         ok2clear = 0;                                                   // reset the timer
355                         return;
356                 }
357
358                 if( ok2clear == 0 ) {                                           // first round where not dropping
359                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
360                 } else {
361                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
362                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
363                                 alarm_raised = 0;
364                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id++, "RMR message dropping has stopped" );
365                         }
366                 }
367         }
368 }
369
370 // ---- utility -----------------------------------------------------------------------------------
371 /*
372         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
373         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
374 */
375 static char* clip( char* buf ) {
376         char*   tok;
377
378         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
379                 buf++;
380         }
381
382         if( (tok = strchr( buf, '#' )) != NULL ) {
383                 if( tok == buf ) {
384                         return buf;                                     // just push back; leading comment sym handled there
385                 }
386
387                 if( isspace( *(tok-1) ) ) {
388                         *tok = 0;
389                 }
390         }
391
392         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
393         *(tok+1) = 0;
394
395         return buf;
396 }
397
398 /*
399         This accepts a pointer to a nil terminated string, and ensures that there is a
400         newline as the last character. If there is not, a new buffer is allocated and
401         the newline is added.  If a new buffer is allocated, the buffer passed in is
402         freed.  The function returns a pointer which the caller should use, and must
403         free.  In the event of an error, a nil pointer is returned.
404 */
405 static char* ensure_nlterm( char* buf ) {
406         char*   nb = NULL;
407         int             len = 0;
408
409         if( buf != NULL ) {
410                 len = strlen( buf );
411         }
412
413         nb = buf;                                                       // default to returning original as is
414         switch( len ) {
415                 case 0:
416                         nb = strdup( "\n" );
417                         break;
418
419                 case 1:
420                         if( *buf != '\n' ) {            // not a newline; realloc
421                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
422                                 nb = strdup( " \n" );
423                                 *nb = *buf;
424                                 free( buf );
425                         }
426                         break;
427
428                 default:
429                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
430                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
431                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
432                                         memcpy( nb, buf, len );
433                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
434                                         *(nb+len+1) = 0;
435                                         free( buf );
436                                 }
437                         }
438                         break;
439         }
440
441         return nb;
442 }
443
444 /*
445         Roll the new table into the active and the active into the old table. We
446         must have the lock on the active table to do this. It's possible that there
447         is no active table (first load), so we have to account for that (no locking).
448 */
449 static void roll_tables( uta_ctx_t* ctx ) {
450
451         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
452                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
453                 ctx->old_rtable = ctx->new_rtable;
454                 ctx->new_rtable = NULL;
455                 return;
456         }
457
458         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
459                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
460                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
461                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
462                 pthread_mutex_unlock( ctx->rtgate );
463         } else {
464                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
465                 ctx->rtable = ctx->new_rtable;                          // make new the active one
466         }
467
468         ctx->new_rtable = NULL;
469 }
470
471 /*
472         Given a thing list, extend the array of pointers by 1/2 of the current
473         number allocated. If we cannot realloc an array, then we set the error
474         flag.  Unlikely, but will prevent a crash, AND will prevent us from
475         trying to use the table since we couldn't capture everything.
476 */
477 static void extend_things( thing_list_t* tl ) {
478         int old_alloc;
479         void*   old_things;
480         void*   old_names;
481
482         if( tl == NULL ) {
483                 return;
484         }
485
486         old_alloc = tl->nalloc;                         // capture current things
487         old_things = tl->things;
488         old_names = tl->names;
489
490         tl->nalloc += tl->nalloc/2;                     // new allocation size
491
492         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
493         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
494
495         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
496                 tl->error = 1;
497                 return;
498         }
499
500         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
501         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
502
503         free( old_things );
504         free( old_names );
505 }
506
507 // ------------ entry update functions ---------------------------------------------------------------
508 /*
509         Given a message type create a route table entry and add to the hash keyed on the
510         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
511         is the number of group slots to allocate in the entry.
512 */
513 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
514         rtable_ent_t* rte;
515         rtable_ent_t* old_rte;          // entry which was already in the table for the key
516
517         if( rt == NULL ) {
518                 return NULL;
519         }
520
521         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
522                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
523                 return NULL;
524         }
525         memset( rte, 0, sizeof( *rte ) );
526         rte->refs = 1;
527         rte->key = key;
528
529         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
530                 nrrgroups = 10;
531         }
532
533         if( nrrgroups ) {
534                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
535                         free( rte );
536                         return NULL;
537                 }
538                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
539         } else {
540                 rte->rrgroups = NULL;
541         }
542
543         rte->nrrgroups = nrrgroups;
544
545         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
546                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
547         }
548
549         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
550
551         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
552         return rte;
553 }
554
555 /*
556         This accepts partially parsed information from an rte or mse record sent by route manager or read from
557         a file such that:
558                 ts_field is the msg-type,sender field
559                 subid is the integer subscription id
560                 rr_field is the endpoint information for round robening message over
561
562         If all goes well, this will add an RTE to the table under construction.
563
564         The ts_field is checked to see if we should ingest this record. We ingest if one of
565         these is true:
566                 there is no sender info (a generic entry for all)
567                 there is sender and our host:port matches one of the senders
568                 the sender info is an IP address that matches one of our IP addresses
569 */
570 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
571         rtable_ent_t*   rte;            // route table entry added
572         char const*     tok;
573         int             ntoks;
574         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
575         char*   tokens[128];
576         char*   gtokens[64];
577         int             i;
578         int             ngtoks;                         // number of tokens in the group list
579         int             grp;                            // index into group list
580         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
581         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
582
583         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
584         rr_field = clip( rr_field );
585
586         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
587                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
588                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
589
590                 key = build_rt_key( subid, atoi( ts_field ) );
591
592                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
593
594                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
595                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
596                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
597                         }
598                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
599                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
600
601                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
602                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
603                                         for( i = 0; i < ntoks; i++ ) {
604                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
605                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
606                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
607                                                         has_ep = TRUE;
608                                                 }
609                                         }
610                                         if( has_ep ) {
611                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
612                                                 has_ep = FALSE;
613                                         }
614                                 }
615                         }
616                 }
617         } else {
618                 if( DEBUG || (vlevel > 2) ) {
619                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
620                 }
621         }
622 }
623
624 /*
625         Trash_entry takes a partially parsed record from the input and
626         will delete the entry if the sender,mtype matches us or it's a
627         generic mtype. The refernce in the new table is removed and the
628         refcounter for the actual rte is decreased. If that ref count is
629         0 then the memory is freed (handled byh the del_rte call).
630 */
631 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
632         rtable_ent_t*   rte;            // route table entry to be 'deleted'
633         char const*     tok;
634         int             ntoks;
635         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
636         char*   tokens[128];
637
638         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
639                 return;
640         }
641
642         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
643
644         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
645                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
646                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
647
648                 key = build_rt_key( subid, atoi( ts_field ) );
649                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
650                 if( rte != NULL ) {
651                         if( DEBUG || (vlevel > 1) ) {
652                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
653                         }
654                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
655                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
656                 } else {
657                         if( DEBUG || (vlevel > 1) ) {
658                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
659                         }
660                 }
661         } else {
662                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
663         }
664 }
665
666 // -------------------------- parse functions --------------------------------------------------
667
668 /*
669         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
670         the 'owner' which should be the dns name or IP address of an enpoint
671         the meid_list is a space separated list of me IDs
672
673         This function assumes the caller has vetted the pointers as needed.
674
675         For each meid in the list, an entry is pushed into the hash which references the owner
676         endpoint such that when the meid is used to route a message it references the endpoint
677         to send messages to.
678 */
679 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
680         char const*     tok;
681         int             ntoks;
682         char*   tokens[128];
683         int             i;
684         int             state;
685         endpoint_t*     ep;                                             // endpoint struct for the owner
686
687         owner = clip( owner );                          // ditch extra whitespace and trailing comments
688         meid_list = clip( meid_list );
689
690         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
691         for( i = 0; i < ntoks; i++ ) {
692                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
693                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
694                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
695                 } else {
696                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
697                 }
698         }
699 }
700
701 /*
702         Given the tokens from an mme_del, delete the listed meid entries from the new
703         table. The list is a space separated list of meids.
704
705         The meids in the hash reference endpoints which are never deleted and so
706         the only thing that we need to do here is to remove the meid from the hash.
707
708         This function assumes the caller has vetted the pointers as needed.
709 */
710 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
711         char const*     tok;
712         int             ntoks;
713         char*   tokens[128];
714         int             i;
715
716         if( rtab->hash == NULL ) {
717                 return;
718         }
719
720         meid_list = clip( meid_list );
721
722         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
723         for( i = 0; i < ntoks; i++ ) {
724                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
725                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
726         }
727 }
728
729 /*
730         Parse a partially parsed meid record. Tokens[0] should be one of:
731                 meid_map, mme_ar, mme_del.
732
733         pctx is the private context needed to return an ack/nack using the provided
734         message buffer with the route managers address info.
735 */
736 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
737         char wbuf[1024];
738
739         if( tokens == NULL || ntoks < 1 ) {
740                 return;                                                 // silent but should never happen
741         }
742
743         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
744                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
745                 return;
746         }
747
748         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
749                 tokens[1] = clip( tokens[1] );
750                 if( *(tokens[1]) == 's' ) {
751                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
752                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
753                                 uta_rt_drop( ctx->new_rtable );
754                                 ctx->new_rtable = NULL;
755                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
756                         }
757
758                         if( ctx->table_id != NULL ) {
759                                 free( ctx->table_id );
760                         }
761                         if( ntoks > 2 ) {
762                                 ctx->table_id = strdup( clip( tokens[2] ) );
763                         } else {
764                                 ctx->table_id = NULL;
765                         }
766
767                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
768                         ctx->new_rtable->mupdates = 0;
769
770                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
771                 } else {
772                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
773                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
774                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
775                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
776                                                                 ctx->new_rtable->mupdates, tokens[2] );
777                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
778                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
779                                                 uta_rt_drop( ctx->new_rtable );
780                                                 ctx->new_rtable = NULL;
781                                                 return;
782                                         }
783
784                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
785                                 }
786
787                                 if( ctx->new_rtable ) {
788                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
789                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
790                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
791
792                                         if( vlevel > 0 ) {
793                                                 if( ctx->old_rtable != NULL ) {
794                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
795                                                         rt_stats( ctx->old_rtable );
796                                                 } else {
797                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
798                                                 }
799                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
800                                                 rt_stats( ctx->rtable );
801                                         }
802                                 } else {
803                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
804                                         ctx->new_rtable = NULL;
805                                 }
806                         }
807                 }
808
809                 return;
810         }
811
812         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
813                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
814                 return;
815         }
816
817         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
818                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
819                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
820                         return;
821                 }
822                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
823                 ctx->new_rtable->mupdates++;
824                 return;
825         }
826
827         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
828                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
829                 ctx->new_rtable->mupdates++;
830                 return;
831         }
832 }
833
834 /*
835         Parse a single record recevied from the route table generator, or read
836         from a static route table file.  Start records cause a new table to
837         be started (if a partial table was received it is discarded. Table
838         entry records are added to the currenly 'in progress' table, and an
839         end record causes the in progress table to be finalised and the
840         currently active table is replaced.
841
842         The updated table will be activated when the *|end record is encountered.
843         However, to allow for a "double" update, where both the meid map and the
844         route table must be updated at the same time, the end indication on a
845         route table (new or update) may specifiy "hold" which indicates that meid
846         map entries are to follow and the updated route table should be held as
847         pending until the end of the meid map is received and validated.
848
849         CAUTION:  we are assuming that there is a single route/meid map generator
850                 and as such only one type of update is received at a time; in other
851                 words, the sender cannot mix update records and if there is more than
852                 one sender process they must synchronise to avoid issues.
853
854
855         For a RT update, we expect:
856                 newrt | start | <table-id>
857                 newrt | end | <count>
858                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
859                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
860                 mse| <mtype>[,sender] | <sub-id> | %meid
861
862
863         For a meid map update we expect:
864                 meid_map | start | <table-id>
865                 meid_map | end | <count> | <md5-hash>
866                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
867                 mme_del | <meid0> <meid1>...<meidn>
868
869
870         The pctx is our private context that must be used to send acks/status
871         messages back to the route manager.  The regular ctx is the ctx that
872         the user has been given and thus that's where we have to hang the route
873         table we're working with.
874
875         If mbuf is given, and we need to ack, then we ack using the mbuf and a
876         return to sender call (allows route manager to use wh_call() to send
877         an update and rts is required to get that back to the right thread).
878         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
879         will be used.
880 */
881 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
882         int i;
883         int ntoks;                                                      // number of tokens found in something
884         int ngtoks;
885         int     grp;                                                    // group number
886         rtable_ent_t const*     rte;                    // route table entry added
887         char*   tokens[128];
888         char*   tok;                                            // pointer into a token or string
889         char    wbuf[1024];
890
891         if( ! buf ) {
892                 return;
893         }
894
895         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
896                 buf++;
897         }
898         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
899         *(tok+1) = 0;
900
901         memset( tokens, 0, sizeof( tokens ) );
902         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
903                 tokens[0] = clip( tokens[0] );
904                 switch( *(tokens[0]) ) {
905                         case 0:                                                                                                 // ignore blanks
906                                 // fallthrough
907                         case '#':                                                                                               // and comment lines
908                                 break;
909
910                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
911                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
912                                         break;
913                                 }
914
915                                 if( ntoks < 3 ) {
916                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
917                                         break;
918                                 }
919
920                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
921                                 ctx->new_rtable->updates++;
922                                 break;
923
924                         case 'n':                                                                                               // newrt|{start|end}
925                                 tokens[1] = clip( tokens[1] );
926                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
927                                         if( ntoks >2 ) {
928                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
929                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
930                                                                 ctx->new_rtable->updates, tokens[2] );
931                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
932                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
933                                                         uta_rt_drop( ctx->new_rtable );
934                                                         ctx->new_rtable = NULL;
935                                                         break;
936                                                 }
937                                         }
938
939                                         if( ctx->new_rtable ) {
940                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
941                                                 if( DEBUG > 1 || (vlevel > 1) ) {
942                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
943                                                         dump_tables( ctx );
944                                                 }
945
946                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
947                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
948                                         } else {
949                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
950                                                 ctx->new_rtable = NULL;
951                                         }
952                                 } else {                                                                                                                        // start a new table.
953                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
954                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
955
956                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
957                                                 uta_rt_drop( ctx->new_rtable );
958                                                 ctx->new_rtable = NULL;
959                                         }
960
961                                         if( ctx->table_id != NULL ) {
962                                                 free( ctx->table_id );
963                                         }
964                                         if( ntoks >2 ) {
965                                                 ctx->table_id = strdup( clip( tokens[2] ) );
966                                         } else {
967                                                 ctx->table_id = NULL;
968                                         }
969
970                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
971                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
972
973                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
974                                 }
975                                 break;
976
977                         case 'm':                                                                       // mse entry or one of the meid_ records
978                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
979                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
980                                                 break;
981                                         }
982
983                                         if( ntoks < 4 ) {
984                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
985                                                 break;
986                                         }
987
988                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
989                                         ctx->new_rtable->updates++;
990                                 } else {
991                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
992                                 }
993                                 break;
994
995                         case 'r':                                       // assume rt entry
996                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
997                                         break;
998                                 }
999
1000                                 ctx->new_rtable->updates++;
1001                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1002                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1003                                 } else {
1004                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1005                                 }
1006                                 break;
1007
1008                         case 'u':                                                                                               // update current table, not a total replacement
1009                                 tokens[1] = clip( tokens[1] );
1010                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1011                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1012                                                 break;
1013                                         }
1014
1015                                         if( ntoks >2 ) {
1016                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1017                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1018                                                                 ctx->new_rtable->updates, tokens[2] );
1019                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1020                                                         uta_rt_drop( ctx->new_rtable );
1021                                                         ctx->new_rtable = NULL;
1022                                                         break;
1023                                                 }
1024                                         }
1025
1026                                         if( ctx->new_rtable ) {
1027                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1028                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1029                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1030                                                         dump_tables( ctx );
1031                                                 }
1032
1033                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1034                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1035                                         } else {
1036                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1037                                                 ctx->new_rtable = NULL;
1038                                         }
1039                                 } else {                                                                                        // start a new table.
1040                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1041                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1042                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1043                                                 uta_rt_drop( ctx->new_rtable );
1044                                                 ctx->new_rtable = NULL;
1045                                         }
1046
1047                                         if( ntoks > 2 ) {
1048                                                 if( ctx->table_id != NULL ) {
1049                                                         free( ctx->table_id );
1050                                                 }
1051                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1052                                         }
1053
1054                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1055                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1056
1057                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1058                                 }
1059                                 break;
1060
1061                         default:
1062                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1063                                 break;
1064                 }
1065         }
1066 }
1067
1068 /*
1069         This function attempts to open a static route table in order to create a 'seed'
1070         table during initialisation.  The environment variable RMR_SEED_RT is expected
1071         to contain the necessary path to the file. If missing, or if the file is empty,
1072         no route table will be available until one is received from the generator.
1073
1074         This function is probably most useful for testing situations, or extreme
1075         cases where the routes are static.
1076 */
1077 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1078         int             i;
1079         char*   fname;
1080         char*   fbuf;                           // buffer with file contents
1081         char*   rec;                            // start of the record
1082         char*   eor;                            // end of the record
1083         int             rcount = 0;                     // record count for debug
1084
1085         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1086                 return;
1087         }
1088
1089         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1090                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1091                 return;
1092         }
1093
1094         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1095         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1096                 if( *eor == '\r' ) {
1097                         *eor = '\n';                                                            // will look like a blank line which is ok
1098                 }
1099         }
1100
1101         rec = fbuf;
1102         while( rec && *rec ) {
1103                 rcount++;
1104                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1105                         *eor = 0;
1106                 } else {
1107                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1108                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1109                         free( fbuf );
1110                         return;
1111                 }
1112
1113                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1114
1115                 rec = eor+1;
1116         }
1117
1118         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1119         free( fbuf );
1120 }
1121
1122 /*
1123         Callback driven for each thing in a symtab. We collect the pointers to those
1124         things for later use (cloning).
1125 */
1126 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1127         thing_list_t*   tl;
1128
1129         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1130                 return;
1131         }
1132
1133         if( thing == NULL ) {
1134                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1135                 return;
1136         }
1137
1138         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1139         tl->things[tl->nused++] = thing;                // save a reference to the thing
1140
1141         if( tl->nused >= tl->nalloc ) {
1142                 extend_things( tl );                            // not enough allocated
1143         }
1144 }
1145
1146 /*
1147         Called to delete a route table entry struct. We delete the array of endpoint
1148         pointers, but NOT the endpoints referenced as those are referenced from
1149         multiple entries.
1150
1151         Route table entries can be concurrently referenced by multiple symtabs, so
1152         the actual delete happens only if decrementing the rte's ref count takes it
1153         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1154         the symtab, or overlaying an entry.
1155
1156         This function uses ONLY the pointer to the rte (thing) and ignores the other
1157         information that symtab foreach function passes (st, entry, and data) which
1158         means that it _can_ safetly be used outside of the foreach setting. If
1159         the function is changed to depend on any of these three, then a stand-alone
1160         rte_cleanup() function should be added and referenced by this, and refererences
1161         to this outside of the foreach world should be changed.
1162 */
1163 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1164         rtable_ent_t*   rte;
1165         int i;
1166
1167         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1168                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1169                 return;
1170         }
1171
1172         rte->refs--;
1173         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1174                 return;
1175         }
1176
1177         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1178                 for( i = 0; i < rte->nrrgroups; i++ ) {
1179                         if( rte->rrgroups[i] ) {
1180                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1181                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1182                         }
1183
1184                 }
1185
1186                 free( rte->rrgroups );
1187         }
1188
1189         free( rte );                                                                                    // finally, drop the potato
1190 }
1191
1192 /*
1193         Read an entire file into a buffer. We assume for route table files
1194         they will be smallish and so this won't be a problem.
1195         Returns a pointer to the buffer, or nil. Caller must free.
1196         Terminates the buffer with a nil character for string processing.
1197
1198         If we cannot stat the file, we assume it's empty or missing and return
1199         an empty buffer, as opposed to a nil, so the caller can generate defaults
1200         or error if an empty/missing file isn't tolerated.
1201 */
1202 static char* uta_fib( char const* fname ) {
1203         struct stat     stats;
1204         off_t           fsize = 8192;   // size of the file
1205         off_t           nread;                  // number of bytes read
1206         int                     fd;
1207         char*           buf;                    // input buffer
1208
1209         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1210                 if( fstat( fd, &stats ) >= 0 ) {
1211                         if( stats.st_size <= 0 ) {                                      // empty file
1212                                 close( fd );
1213                                 fd = -1;
1214                         } else {
1215                                 fsize = stats.st_size;                                          // stat ok, save the file size
1216                         }
1217                 } else {
1218                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1219                 }
1220         }
1221
1222         if( fd < 0 ) {                                                                                  // didn't open or empty
1223                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1224                         return NULL;
1225                 }
1226
1227                 *buf = 0;
1228                 return buf;
1229         }
1230
1231         // add a size limit check here
1232
1233         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1234                 close( fd );
1235                 errno = ENOMEM;
1236                 return NULL;
1237         }
1238
1239         nread = read( fd, buf, fsize );
1240         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1241                 free( buf );
1242                 errno = EFBIG;                                                                                  // likely too much to handle
1243                 close( fd );
1244                 return NULL;
1245         }
1246
1247         buf[nread] = 0;
1248
1249         close( fd );
1250         return buf;
1251 }
1252
1253 // --------------------- initialisation/creation ---------------------------------------------
1254 /*
1255         Create and initialise a route table; Returns a pointer to the table struct.
1256 */
1257 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1258         route_table_t*  rt;
1259
1260         if( ctx == NULL ) {
1261                 return NULL;
1262         }
1263         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1264                 return NULL;
1265         }
1266
1267         memset( rt, 0, sizeof( *rt ) );
1268
1269         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1270                 free( rt );
1271                 return NULL;
1272         }
1273
1274         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1275         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1276         pthread_mutex_init( rt->gate, NULL );
1277
1278         return rt;
1279 }
1280
1281 /*
1282         Clones one of the spaces in the given table.
1283         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1284         Space is the space in the old table to copy. Space 0 uses an integer key and
1285         references rte structs. All other spaces use a string key and reference endpoints.
1286 */
1287 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1288         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1289         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1290         void*   sst;                    // source symtab
1291         void*   nst;                    // new symtab
1292         thing_list_t things;    // things from the space to copy
1293         int             i;
1294         int             free_on_err = 0;
1295
1296         if( ctx == NULL ) {
1297                 return NULL;
1298         }
1299         if( nrt == NULL ) {                             // make a new table if needed
1300                 free_on_err = 1;
1301                 nrt = uta_rt_init( ctx );
1302                 if( nrt == NULL ) {
1303                         return NULL;
1304                 }
1305         }
1306
1307         if( srt == NULL ) {             // source was nil, just give back the new table
1308                 return nrt;
1309         }
1310
1311         things.nalloc = 2048;
1312         things.nused = 0;
1313         things.error = 0;
1314         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1315         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1316         if( things.things == NULL || things.names == NULL ){
1317                 if( things.things != NULL ) {
1318                         free( things.things );
1319                 }
1320                 if( things.names != NULL ) {
1321                         free( things.names );
1322                 }
1323
1324                 if( free_on_err ) {
1325                         rmr_sym_free( nrt->hash );
1326                         free( nrt );
1327                         nrt = NULL;
1328                 } else {
1329                         nrt->error = 1;
1330                 }
1331
1332                 return nrt;
1333         }
1334         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1335         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1336
1337         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1338         nst = nrt->hash;
1339
1340         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1341         if( things.error ) {                            // something happened and capture failed
1342                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1343                 free( things.things );
1344                 free( things.names );
1345                 if( free_on_err ) {
1346                         rmr_sym_free( nrt->hash );
1347                         free( nrt );
1348                         nrt = NULL;
1349                 } else {
1350                         nrt->error = 1;
1351                 }
1352                 return nrt;
1353         }
1354
1355         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1356         for( i = 0; i < things.nused; i++ ) {
1357                 if( space ) {                                                                                           // string key, epoint reference
1358                         ep = (endpoint_t *) things.things[i];
1359                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1360                 } else {
1361                         rte = (rtable_ent_t *) things.things[i];
1362                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1363                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1364                 }
1365         }
1366
1367         free( things.things );
1368         free( (void *) things.names );
1369         return nrt;
1370 }
1371
1372 /*
1373         Given a destination route table (drt), clone from the source (srt) into it.
1374         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1375         allocate the drt if that was nil too). If all is true (1), then we will clone both
1376         the MT and the ME spaces; otherwise only the ME space is cloned.
1377 */
1378 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1379         endpoint_t*             ep;                             // an endpoint
1380         rtable_ent_t*   rte;                    // a route table entry
1381         int i;
1382
1383         if( ctx == NULL ) {
1384                 return NULL;
1385         }
1386         if( drt == NULL ) {
1387                 drt = uta_rt_init( ctx );
1388         }
1389         if( srt == NULL ) {
1390                 return drt;
1391         }
1392
1393         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1394         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1395         if( all ) {
1396                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1397         }
1398
1399         return drt;
1400 }
1401
1402 /*
1403         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1404         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1405         context to be referenced by the new pointer; the old pointer is set to nil.
1406
1407         If the old table doesn't exist, then a new table is created and the new pointer is
1408         set to reference it.
1409
1410         The ME namespace references endpoints which do not need to be released, therefore we
1411         do not need to run that portion of the table to deref like we do for the RTEs.
1412 */
1413 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1414         int counter = 0;
1415         route_table_t*  rt;
1416
1417         if( ctx == NULL ) {
1418                 return NULL;
1419         }
1420
1421         if( (rt = ctx->old_rtable) != NULL ) {
1422                 ctx->old_rtable = NULL;
1423                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1424                         if( counter++ > 1000 ) {
1425                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1426                                 break;
1427                         }
1428
1429                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1430                 }
1431
1432                 if( rt->hash != NULL ) {
1433                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1434                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1435                 }
1436
1437                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1438         } else {
1439                 rt = NULL;
1440         }
1441
1442         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1443         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1444                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1445         } else {
1446                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1447                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1448                 rt->error = 1;
1449         }
1450
1451         return rt;
1452 }
1453
1454
1455 /*
1456         Given a name, find the endpoint struct in the provided route table.
1457 */
1458 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1459
1460         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1461                 return NULL;
1462         }
1463
1464         return rmr_sym_get( rt->ephash, ep_name, 1 );
1465 }
1466
1467 /*
1468         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1469         Does NOT destroy the gate as it's a common gate for ALL route tables.
1470 */
1471 static void uta_rt_drop( route_table_t* rt ) {
1472         if( rt == NULL ) {
1473                 return;
1474         }
1475
1476         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1477         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1478         free( rt );
1479 }
1480
1481 /*
1482         Look up and return the pointer to the endpoint stuct matching the given name.
1483         If not in the hash, a new endpoint is created, added to the hash. Should always
1484         return a pointer.
1485 */
1486 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1487         endpoint_t*     ep;
1488
1489         if( !rt || !ep_name || ! *ep_name ) {
1490                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1491                 errno = EINVAL;
1492                 return NULL;
1493         }
1494
1495         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1496                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1497                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1498                         errno = ENOMEM;
1499                         return NULL;
1500                 }
1501
1502                 ep->notify = 1;                                                         // show notification on first connection failure
1503                 ep->open = 0;                                                           // not connected
1504                 ep->addr = uta_h2ip( ep_name );
1505                 ep->name = strdup( ep_name );
1506                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1507                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1508
1509                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1510         }
1511
1512         return ep;
1513 }
1514
1515
1516 /*
1517         Given a session id and message type build a key that can be used to look up the rte in the route
1518         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1519 */
1520 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1521         uint64_t key;
1522
1523         if( sub_id == UNSET_SUBID ) {
1524                 key = 0xffffffff00000000 | mtype;
1525         } else {
1526                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1527         }
1528
1529         return key;
1530 }
1531
1532 /*
1533         Given a route table and meid string, find the owner (if known). Returns a pointer to
1534         the endpoint struct or nil.
1535 */
1536 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1537         endpoint_t const* ep;           // the ep we found in the hash
1538
1539         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1540                 return NULL;
1541         }
1542
1543         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1544 }
1545
1546 /*
1547         This returns a pointer to the currently active route table and ups
1548         the reference count so that the route table is not freed while it
1549         is being used. The caller MUST call release_rt() when finished
1550         with the pointer.
1551
1552         Care must be taken: the ctx->rtable pointer _could_ change during the time
1553         between the release of the lock and the return. Therefore we MUST grab
1554         the current pointer when we have the lock so that if it does we don't
1555         return a pointer to the wrong table.
1556
1557         This will return NULL if there is no active table.
1558 */
1559 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1560         route_table_t*  rrt;                    // return value
1561
1562         if( ctx == NULL || ctx->rtable == NULL ) {
1563                 return NULL;
1564         }
1565
1566         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1567         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1568         rrt->ref_count++;
1569         pthread_mutex_unlock( ctx->rtgate );
1570
1571         return rrt;                                                                             // pointer we upped the count with
1572 }
1573
1574 /*
1575         This will "release" the route table by reducing the use counter
1576         in the table. The table may not be freed until the counter reaches
1577         0, so it's imparative that the pointer be "released" when it is
1578         fetched by get_rt().  Once the caller has released the table it
1579         may not safely use the pointer that it had.
1580 */
1581 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1582         if( ctx == NULL || rt == NULL ) {
1583                 return;
1584         }
1585
1586         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1587         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1588                 rt->ref_count--;
1589         }
1590         pthread_mutex_unlock( ctx->rtgate );
1591 }
1592 #endif