Ensure RT incremental update not applied early
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int error;                                      // if a realloc failed, this will be set
61         int nalloc;
62         int nused;
63         void** things;
64         const char** names;
65 } thing_list_t;
66
67 // ---- debugging/testing -------------------------------------------------------------------------
68
69 /*
70         Dump some stats for an endpoint in the RT. This is generally called to
71         verify endpoints after a table load/change.
72
73         This is called by the for-each mechanism of the symtab and the prototype is
74         fixe; we don't really use some of the parms, but have dummy references to
75         keep sonar from complaining.
76 */
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
78         int*    counter;
79         endpoint_t* ep;
80
81         if( (ep = (endpoint_t *) thing) == NULL ) {
82                 return;
83         }
84
85         if( (counter = (int *) vcounter) != NULL ) {
86                 (*counter)++;
87         } else {
88                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
89                 return;
90         }
91
92         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
93 }
94
95 /*
96         Called to count meid entries in the table. The meid points to an 'owning' endpoint
97         so we can list what we find
98
99         See note in ep_stats about dummy refs.
100 */
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
102         int*    counter;
103         endpoint_t* ep;
104
105         if( (ep = (endpoint_t *) thing) == NULL ) {
106                 return;
107         }
108
109         if( (counter = (int *) vcounter) != NULL ) {
110                 (*counter)++;
111         } else {
112                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
113         }
114
115         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
116 }
117
118 /*
119         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120         the 'source' information and is added to each message.
121
122         See note above about dummy references.
123 */
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
125         endpoint_t* ep;
126         char*   id;
127
128         if( (ep = (endpoint_t *) thing) == NULL ) {
129                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
130                 return;
131         }
132
133         if( (id = (char *) vid) == NULL ) {
134                 id = "missing";
135         }
136
137         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138                 (long long) time( NULL ),
139                 id,
140                 ep->name,
141                 ep->open,
142                 ep->scounts[EPSC_GOOD],
143                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144                 ep->scounts[EPSC_FAIL],
145                 ep->scounts[EPSC_TRANS]   );
146 }
147
148 /*
149         Dump stats for a route entry in the table.
150 */
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
152         int*    counter;
153         rtable_ent_t const* rte;                // thing is really an rte
154         int             mtype;
155         int             sid;
156
157         if( (rte = (rtable_ent_t *) thing) == NULL ) {
158                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
159                 return;
160         }
161
162         if( (counter = (int *) vcounter) != NULL ) {
163                 (*counter)++;
164         }
165
166         mtype = rte->key & 0xffff;
167         sid = (int) (rte->key >> 32);
168
169         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
170 }
171
172 /*
173         Given a route table, cause some stats to be spit out.
174 */
175 static void  rt_stats( route_table_t* rt ) {
176         int* counter;
177
178         if( rt == NULL ) {
179                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
180                 return;
181         }
182
183         counter = (int *) malloc( sizeof( int ) );
184         *counter = 0;
185         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
188         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
189
190         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
191         *counter = 0;
192         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
193         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
194
195         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
196         *counter = 0;
197         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
198         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
199
200         free( counter );
201 }
202
203 /*
204         Given a route table, cause endpoint counters to be written to stderr. The id
205         parm is written as the "source" in the output.
206 */
207 static void  rt_epcounts( route_table_t* rt, char* id ) {
208         if( rt == NULL ) {
209                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
210                 return;
211         }
212
213         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
214 }
215
216
217 static void dump_tables( uta_ctx_t *ctx ) {
218         if( ctx->old_rtable != NULL ) {
219                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220                 rt_stats( ctx->old_rtable );
221         } else {
222                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
223         }
224         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225         rt_stats( ctx->rtable );
226 }
227
228 // ------------ route manager communication -------------------------------------------------
229 /*
230         Send a request for a table update to the route manager. Updates come in
231         async, so send and go.
232
233         pctx is the private context for the thread; ctx is the application context
234         that we need to be able to send the application ID in case rt mgr needs to
235         use it to idenfity us.
236
237         Returns 0 if we were not able to send a request.
238 */
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
240         rmr_mbuf_t*     smsg;
241         int     state = 0;
242
243         if( ctx->rtg_whid < 0 ) {
244                 return state;
245         }
246
247         smsg = rmr_alloc_msg( pctx, 1024 );
248         if( smsg != NULL ) {
249                 smsg->mtype = RMRRM_REQ_TABLE;
250                 smsg->sub_id = 0;
251                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253                 smsg->len = strlen( smsg->payload ) + 1;
254
255                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256                 if( (state = smsg->state) != RMR_OK ) {
257                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
259                         ctx->rtg_whid = -1;
260                 }
261
262                 rmr_free_msg( smsg );
263         }
264
265         return state;
266 }
267
268 /*
269         Send an ack to the route table manager for a table ID that we are
270         processing.      State is 1 for OK, and 0 for failed. Reason might
271         be populated if we know why there was a failure.
272
273         Context should be the PRIVATE context that we use for messages
274         to route manger and NOT the user's context.
275
276         If a message buffere is passed we use that and use return to sender
277         assuming that this might be a response to a call and that is needed
278         to send back to the proper calling thread. If msg is nil, we allocate
279         and use it.
280 */
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
282         int             use_rts = 1;
283         int             payload_size = 1024;
284
285         if( ctx == NULL || ctx->rtg_whid < 0 ) {
286                 return;
287         }
288
289         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
290                 return;
291         }
292
293         if( smsg != NULL ) {
294                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
295         } else {
296                 use_rts = 0;
297                 smsg = rmr_alloc_msg( ctx, payload_size );
298         }
299
300         if( smsg != NULL ) {
301                 smsg->mtype = RMRRM_TABLE_STATE;
302                 smsg->sub_id = -1;
303                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
305
306                 smsg->len = strlen( smsg->payload ) + 1;
307
308                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
309                 if( use_rts ) {
310                         smsg = rmr_rts_msg( ctx, smsg );
311                 } else {
312                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
313                 }
314                 if( (state = smsg->state) != RMR_OK ) {
315                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
317                         ctx->rtg_whid = -1;
318                 }
319
320                 if( ! use_rts ) {
321                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
322                 }
323         }
324 }
325
326 // ---- alarm generation --------------------------------------------------------------------------
327
328 /*
329         Given the user's context (not the thread private context) look to see if the application isn't
330         working fast enough and we're dropping messages. If the drop counter has changed since the last
331         peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
332         set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
333
334         The private context is what we use to send so as not to interfere with the user flow.
335 */
336 static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
337         static  int alarm_raised = 0;
338         static  int ok2clear = 0;                                       // time that we can clear
339         static  int lastd = 0;                                          // the last counter value so we can compute delta
340         static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
341
342         rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
343         if( ! alarm_raised ) {
344                 if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
345                         return;
346                 }
347
348                 alarm_raised = 1;
349                 uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
350                 rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
351         } else {
352                 if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
353                         lastd = uctx->dcount;
354                         ok2clear = 0;                                                   // reset the timer
355                         return;
356                 }
357
358                 if( ok2clear == 0 ) {                                           // first round where not dropping
359                         ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
360                 } else {
361                         if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
362                                 rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
363                                 alarm_raised = 0;
364                                 uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id++, "RMR message dropping has stopped" );
365                         }
366                 }
367         }
368 }
369
370 // ---- utility -----------------------------------------------------------------------------------
371 /*
372         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
373         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
374 */
375 static char* clip( char* buf ) {
376         char*   tok;
377
378         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
379                 buf++;
380         }
381
382         if( (tok = strchr( buf, '#' )) != NULL ) {
383                 if( tok == buf ) {
384                         return buf;                                     // just push back; leading comment sym handled there
385                 }
386
387                 if( isspace( *(tok-1) ) ) {
388                         *tok = 0;
389                 }
390         }
391
392         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
393         *(tok+1) = 0;
394
395         return buf;
396 }
397
398 /*
399         This accepts a pointer to a nil terminated string, and ensures that there is a
400         newline as the last character. If there is not, a new buffer is allocated and
401         the newline is added.  If a new buffer is allocated, the buffer passed in is
402         freed.  The function returns a pointer which the caller should use, and must
403         free.  In the event of an error, a nil pointer is returned.
404 */
405 static char* ensure_nlterm( char* buf ) {
406         char*   nb = NULL;
407         int             len = 0;
408
409         if( buf != NULL ) {
410                 len = strlen( buf );
411         }
412
413         nb = buf;                                                       // default to returning original as is
414         switch( len ) {
415                 case 0:
416                         nb = strdup( "\n" );
417                         break;
418
419                 case 1:
420                         if( *buf != '\n' ) {            // not a newline; realloc
421                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
422                                 nb = strdup( " \n" );
423                                 *nb = *buf;
424                                 free( buf );
425                         }
426                         break;
427
428                 default:
429                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
430                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
431                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
432                                         memcpy( nb, buf, len );
433                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
434                                         *(nb+len+1) = 0;
435                                         free( buf );
436                                 }
437                         }
438                         break;
439         }
440
441         return nb;
442 }
443
444 /*
445         Roll the new table into the active and the active into the old table. We
446         must have the lock on the active table to do this. It's possible that there
447         is no active table (first load), so we have to account for that (no locking).
448 */
449 static void roll_tables( uta_ctx_t* ctx ) {
450
451         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
452                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
453                 ctx->old_rtable = ctx->new_rtable;
454                 ctx->new_rtable = NULL;
455                 return;
456         }
457
458         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
459                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
460                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
461                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
462                 pthread_mutex_unlock( ctx->rtgate );
463         } else {
464                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
465                 ctx->rtable = ctx->new_rtable;                          // make new the active one
466         }
467
468         ctx->new_rtable = NULL;
469 }
470
471 /*
472         Given a thing list, extend the array of pointers by 1/2 of the current
473         number allocated. If we cannot realloc an array, then we set the error
474         flag.  Unlikely, but will prevent a crash, AND will prevent us from
475         trying to use the table since we couldn't capture everything.
476 */
477 static void extend_things( thing_list_t* tl ) {
478         int old_alloc;
479         void*   old_things;
480         void*   old_names;
481
482         if( tl == NULL ) {
483                 return;
484         }
485
486         old_alloc = tl->nalloc;                         // capture current things
487         old_things = tl->things;
488         old_names = tl->names;
489
490         tl->nalloc += tl->nalloc/2;                     // new allocation size
491
492         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
493         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
494
495         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
496                 tl->error = 1;
497                 return;
498         }
499
500         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
501         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
502
503         free( old_things );
504         free( old_names );
505 }
506
507 // ------------ entry update functions ---------------------------------------------------------------
508 /*
509         Given a message type create a route table entry and add to the hash keyed on the
510         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
511         is the number of group slots to allocate in the entry.
512 */
513 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
514         rtable_ent_t* rte;
515         rtable_ent_t* old_rte;          // entry which was already in the table for the key
516
517         if( rt == NULL ) {
518                 return NULL;
519         }
520
521         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
522                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
523                 return NULL;
524         }
525         memset( rte, 0, sizeof( *rte ) );
526         rte->refs = 1;
527         rte->key = key;
528
529         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
530                 nrrgroups = 10;
531         }
532
533         if( nrrgroups ) {
534                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
535                         free( rte );
536                         return NULL;
537                 }
538                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
539         } else {
540                 rte->rrgroups = NULL;
541         }
542
543         rte->nrrgroups = nrrgroups;
544
545         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
546                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
547         }
548
549         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
550
551         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
552         return rte;
553 }
554
555 /*
556         This accepts partially parsed information from an rte or mse record sent by route manager or read from
557         a file such that:
558                 ts_field is the msg-type,sender field
559                 subid is the integer subscription id
560                 rr_field is the endpoint information for round robening message over
561
562         If all goes well, this will add an RTE to the table under construction.
563
564         The ts_field is checked to see if we should ingest this record. We ingest if one of
565         these is true:
566                 there is no sender info (a generic entry for all)
567                 there is sender and our host:port matches one of the senders
568                 the sender info is an IP address that matches one of our IP addresses
569 */
570 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
571         rtable_ent_t*   rte;            // route table entry added
572         char const*     tok;
573         int             ntoks;
574         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
575         char*   tokens[128];
576         char*   gtokens[64];
577         int             i;
578         int             ngtoks;                         // number of tokens in the group list
579         int             grp;                            // index into group list
580         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
581         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
582
583         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
584         rr_field = clip( rr_field );
585
586         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
587                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
588                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
589
590                 key = build_rt_key( subid, atoi( ts_field ) );
591
592                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
593
594                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
595                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
596                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
597                         }
598                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
599                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
600
601                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
602                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
603                                         for( i = 0; i < ntoks; i++ ) {
604                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
605                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
606                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
607                                                         has_ep = TRUE;
608                                                 }
609                                         }
610                                         if( has_ep ) {
611                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
612                                                 has_ep = FALSE;
613                                         }
614                                 }
615                         }
616                 }
617         } else {
618                 if( DEBUG || (vlevel > 2) ) {
619                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
620                 }
621         }
622 }
623
624 /*
625         Trash_entry takes a partially parsed record from the input and
626         will delete the entry if the sender,mtype matches us or it's a
627         generic mtype. The refernce in the new table is removed and the
628         refcounter for the actual rte is decreased. If that ref count is
629         0 then the memory is freed (handled byh the del_rte call).
630 */
631 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
632         rtable_ent_t*   rte;            // route table entry to be 'deleted'
633         char const*     tok;
634         int             ntoks;
635         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
636         char*   tokens[128];
637
638         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
639                 return;
640         }
641
642         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
643
644         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
645                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
646                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
647
648                 key = build_rt_key( subid, atoi( ts_field ) );
649                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
650                 if( rte != NULL ) {
651                         if( DEBUG || (vlevel > 1) ) {
652                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
653                         }
654                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
655                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
656                 } else {
657                         if( DEBUG || (vlevel > 1) ) {
658                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
659                         }
660                 }
661         } else {
662                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
663         }
664 }
665
666 // -------------------------- parse functions --------------------------------------------------
667
668 /*
669         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
670         the 'owner' which should be the dns name or IP address of an enpoint
671         the meid_list is a space separated list of me IDs
672
673         This function assumes the caller has vetted the pointers as needed.
674
675         For each meid in the list, an entry is pushed into the hash which references the owner
676         endpoint such that when the meid is used to route a message it references the endpoint
677         to send messages to.
678 */
679 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
680         char const*     tok;
681         int             ntoks;
682         char*   tokens[128];
683         int             i;
684         int             state;
685         endpoint_t*     ep;                                             // endpoint struct for the owner
686
687         owner = clip( owner );                          // ditch extra whitespace and trailing comments
688         meid_list = clip( meid_list );
689
690         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
691         for( i = 0; i < ntoks; i++ ) {
692                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
693                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
694                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
695                 } else {
696                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
697                 }
698         }
699 }
700
701 /*
702         Given the tokens from an mme_del, delete the listed meid entries from the new
703         table. The list is a space separated list of meids.
704
705         The meids in the hash reference endpoints which are never deleted and so
706         the only thing that we need to do here is to remove the meid from the hash.
707
708         This function assumes the caller has vetted the pointers as needed.
709 */
710 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
711         char const*     tok;
712         int             ntoks;
713         char*   tokens[128];
714         int             i;
715
716         if( rtab->hash == NULL ) {
717                 return;
718         }
719
720         meid_list = clip( meid_list );
721
722         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
723         for( i = 0; i < ntoks; i++ ) {
724                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
725                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
726         }
727 }
728
729 /*
730         Parse a partially parsed meid record. Tokens[0] should be one of:
731                 meid_map, mme_ar, mme_del.
732
733         pctx is the private context needed to return an ack/nack using the provided
734         message buffer with the route managers address info.
735 */
736 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
737         char wbuf[1024];
738
739         if( tokens == NULL || ntoks < 1 ) {
740                 return;                                                 // silent but should never happen
741         }
742
743         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
744                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
745                 return;
746         }
747
748         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
749                 tokens[1] = clip( tokens[1] );
750                 if( *(tokens[1]) == 's' ) {
751                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
752                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
753                                 uta_rt_drop( ctx->new_rtable );
754                                 ctx->new_rtable = NULL;
755                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
756                         }
757
758                         if( ctx->table_id != NULL ) {
759                                 free( ctx->table_id );
760                         }
761                         if( ntoks > 2 ) {
762                                 ctx->table_id = strdup( clip( tokens[2] ) );
763                         } else {
764                                 ctx->table_id = NULL;
765                         }
766
767                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
768                         ctx->new_rtable->mupdates = 0;
769
770                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
771                 } else {
772                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
773                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
774                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
775                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
776                                                                 ctx->new_rtable->mupdates, tokens[2] );
777                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
778                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
779                                                 uta_rt_drop( ctx->new_rtable );
780                                                 ctx->new_rtable = NULL;
781                                                 return;
782                                         }
783
784                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
785                                 }
786
787                                 if( ctx->new_rtable ) {
788                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
789                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
790                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
791
792                                         if( vlevel > 0 ) {
793                                                 if( ctx->old_rtable != NULL ) {
794                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
795                                                         rt_stats( ctx->old_rtable );
796                                                 } else {
797                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
798                                                 }
799                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
800                                                 rt_stats( ctx->rtable );
801                                         }
802                                 } else {
803                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
804                                         ctx->new_rtable = NULL;
805                                 }
806                         }
807                 }
808
809                 return;
810         }
811
812         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
813                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
814                 return;
815         }
816
817         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
818                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
819                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
820                         return;
821                 }
822                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
823                 ctx->new_rtable->mupdates++;
824                 return;
825         }
826
827         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
828                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
829                 ctx->new_rtable->mupdates++;
830                 return;
831         }
832 }
833
834 /*
835         This will close the current table snarf file (in *.inc) and open a new one.
836         The curent one is renamed. The final file name is determined by the setting of
837         RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
838         an additional extension of .snarf.  If neither seed or snarf environment vars are
839         set then this does nothing.
840
841         If this is called before the tmp snarf file is opened, then this just opens the file.
842 */
843 static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
844         static int              ok2warn = 0;    // some warnings squelched on first call
845
846         char*   seed_fname;                             // the filename from env
847         char    tfname[512];                    // temp fname
848         char    wfname[512];                    // working buffer for filename
849         char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
850
851         if( ctx == NULL ) {
852                 return;
853         }
854
855         if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
856                 if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
857                         memset( wfname, 0, sizeof( wfname ) );
858                         snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
859                         snarf_fname = wfname;
860                 }
861         }
862
863         if( snarf_fname == NULL ) {
864                 rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
865                 return;
866         }
867
868         memset( tfname, 0, sizeof( tfname ) );
869         snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
870
871         if( ctx->snarf_rt_fd >= 0 ) {
872                 char* msg= "### captured from route manager\n";
873                 write( ctx->snarf_rt_fd, msg, strlen( msg ) );
874                 if( close( ctx->snarf_rt_fd ) < 0 ) {
875                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
876                         return;
877                 }
878
879                 if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
880                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
881                 }
882
883                 if( rename( tfname, snarf_fname ) ) {
884                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
885                 } else {
886                         rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
887                 }
888         }
889         ok2warn = 1;
890
891         ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
892         if( ctx->snarf_rt_fd < 0 ) {
893                 rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
894         } else {
895                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
896         }
897 }
898
899 /*
900         Parse a single record recevied from the route table generator, or read
901         from a static route table file.  Start records cause a new table to
902         be started (if a partial table was received it is discarded. Table
903         entry records are added to the currenly 'in progress' table, and an
904         end record causes the in progress table to be finalised and the
905         currently active table is replaced.
906
907         The updated table will be activated when the *|end record is encountered.
908         However, to allow for a "double" update, where both the meid map and the
909         route table must be updated at the same time, the end indication on a
910         route table (new or update) may specifiy "hold" which indicates that meid
911         map entries are to follow and the updated route table should be held as
912         pending until the end of the meid map is received and validated.
913
914         CAUTION:  we are assuming that there is a single route/meid map generator
915                 and as such only one type of update is received at a time; in other
916                 words, the sender cannot mix update records and if there is more than
917                 one sender process they must synchronise to avoid issues.
918
919
920         For a RT update, we expect:
921                 newrt | start | <table-id>
922                 newrt | end | <count>
923                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
924                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
925                 mse| <mtype>[,sender] | <sub-id> | %meid
926
927
928         For a meid map update we expect:
929                 meid_map | start | <table-id>
930                 meid_map | end | <count> | <md5-hash>
931                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
932                 mme_del | <meid0> <meid1>...<meidn>
933
934
935         The pctx is our private context that must be used to send acks/status
936         messages back to the route manager.  The regular ctx is the ctx that
937         the user has been given and thus that's where we have to hang the route
938         table we're working with.
939
940         If mbuf is given, and we need to ack, then we ack using the mbuf and a
941         return to sender call (allows route manager to use wh_call() to send
942         an update and rts is required to get that back to the right thread).
943         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
944         will be used.
945 */
946 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
947         int i;
948         int ntoks;                                                      // number of tokens found in something
949         int ngtoks;
950         int     grp;                                                    // group number
951         rtable_ent_t const*     rte;                    // route table entry added
952         char*   tokens[128];
953         char*   tok;                                            // pointer into a token or string
954         char    wbuf[1024];
955
956         if( ! buf ) {
957                 return;
958         }
959
960         if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
961                 write( ctx->snarf_rt_fd, buf, strlen( buf ) );
962                 write( ctx->snarf_rt_fd, "\n", 1 );
963         }
964
965         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
966                 buf++;
967         }
968         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
969         *(tok+1) = 0;
970
971         memset( tokens, 0, sizeof( tokens ) );
972         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
973                 tokens[0] = clip( tokens[0] );
974                 switch( *(tokens[0]) ) {
975                         case 0:                                                                                                 // ignore blanks
976                                 // fallthrough
977                         case '#':                                                                                               // and comment lines
978                                 break;
979
980                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
981                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
982                                         break;
983                                 }
984
985                                 if( ntoks < 3 ) {
986                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
987                                         break;
988                                 }
989
990                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
991                                 ctx->new_rtable->updates++;
992                                 break;
993
994                         case 'n':                                                                                               // newrt|{start|end}
995                                 tokens[1] = clip( tokens[1] );
996                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
997                                         if( ctx && ctx->snarf_rt_fd >= 0 ) {
998                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
999                                         }
1000
1001                                         if( ntoks >2 ) {
1002                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1003                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1004                                                                 ctx->new_rtable->updates, tokens[2] );
1005                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
1006                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1007                                                         uta_rt_drop( ctx->new_rtable );
1008                                                         ctx->new_rtable = NULL;
1009                                                         break;
1010                                                 }
1011                                         }
1012
1013                                         if( ctx->new_rtable ) {
1014                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1015                                                 if( DEBUG > 1 || (vlevel > 1) ) {
1016                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
1017                                                         dump_tables( ctx );
1018                                                 }
1019
1020                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1021                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1022                                                 ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
1023                                         } else {
1024                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
1025                                                 ctx->new_rtable = NULL;
1026                                         }
1027                                 } else {                                                                                                                        // start a new table.
1028                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
1029                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1030
1031                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1032                                                 uta_rt_drop( ctx->new_rtable );
1033                                                 ctx->new_rtable = NULL;
1034                                         }
1035
1036                                         if( ctx->table_id != NULL ) {
1037                                                 free( ctx->table_id );
1038                                         }
1039                                         if( ntoks >2 ) {
1040                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1041                                         } else {
1042                                                 ctx->table_id = NULL;
1043                                         }
1044
1045                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
1046                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
1047
1048                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
1049                                 }
1050                                 break;
1051
1052                         case 'm':                                                                       // mse entry or one of the meid_ records
1053                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
1054                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1055                                                 break;
1056                                         }
1057
1058                                         if( ntoks < 4 ) {
1059                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
1060                                                 break;
1061                                         }
1062
1063                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
1064                                         ctx->new_rtable->updates++;
1065                                 } else {
1066                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
1067                                 }
1068                                 break;
1069
1070                         case 'r':                                       // assume rt entry
1071                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
1072                                         break;
1073                                 }
1074
1075                                 ctx->new_rtable->updates++;
1076                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
1077                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
1078                                 } else {
1079                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
1080                                 }
1081                                 break;
1082
1083                         case 'u':                                                                                               // update current table, not a total replacement
1084                                 if( ! ctx->flags & CFL_FULLRT ) {                                       // we cannot update until we have a full table from route generator
1085                                         rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
1086                                         break;
1087                                 }
1088
1089                                 tokens[1] = clip( tokens[1] );
1090                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
1091                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
1092                                                 break;
1093                                         }
1094                                         if( ctx->snarf_rt_fd >= 0 ) {
1095                                                 cycle_snarfed_rt( ctx );                                        // make it available and open a new one
1096                                         }
1097
1098                                         if( ntoks >2 ) {
1099                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
1100                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
1101                                                                 ctx->new_rtable->updates, tokens[2] );
1102                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
1103                                                         uta_rt_drop( ctx->new_rtable );
1104                                                         ctx->new_rtable = NULL;
1105                                                         break;
1106                                                 }
1107                                         }
1108
1109                                         if( ctx->new_rtable ) {
1110                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
1111                                                 if( DEBUG > 1 || (vlevel > 1) )  {
1112                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
1113                                                         dump_tables( ctx );
1114                                                 }
1115
1116                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
1117                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
1118                                         } else {
1119                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
1120                                                 ctx->new_rtable = NULL;
1121                                         }
1122                                 } else {                                                                                        // start a new table.
1123                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
1124                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
1125                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
1126                                                 uta_rt_drop( ctx->new_rtable );
1127                                                 ctx->new_rtable = NULL;
1128                                         }
1129
1130                                         if( ntoks > 2 ) {
1131                                                 if( ctx->table_id != NULL ) {
1132                                                         free( ctx->table_id );
1133                                                 }
1134                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1135                                         }
1136
1137                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1138                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1139
1140                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1141                                 }
1142                                 break;
1143
1144                         default:
1145                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1146                                 break;
1147                 }
1148         }
1149 }
1150
1151 /*
1152         This function attempts to open a static route table in order to create a 'seed'
1153         table during initialisation.  The environment variable RMR_SEED_RT is expected
1154         to contain the necessary path to the file. If missing, or if the file is empty,
1155         no route table will be available until one is received from the generator.
1156
1157         This function is probably most useful for testing situations, or extreme
1158         cases where the routes are static.
1159 */
1160 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1161         int             i;
1162         char*   fname;
1163         char*   fbuf;                           // buffer with file contents
1164         char*   rec;                            // start of the record
1165         char*   eor;                            // end of the record
1166         int             rcount = 0;                     // record count for debug
1167
1168         if( (fname = ctx->seed_rt_fname) == NULL ) {
1169                 if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1170                         return;
1171                 }
1172
1173                 ctx->seed_rt_fname = strdup( fname );
1174                 fname = ctx->seed_rt_fname;
1175         }
1176
1177         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1178                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1179                 return;
1180         }
1181
1182         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1183         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1184                 if( *eor == '\r' ) {
1185                         *eor = '\n';                                                            // will look like a blank line which is ok
1186                 }
1187         }
1188
1189         rec = fbuf;
1190         while( rec && *rec ) {
1191                 rcount++;
1192                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1193                         *eor = 0;
1194                 } else {
1195                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1196                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1197                         free( fbuf );
1198                         return;
1199                 }
1200
1201                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1202
1203                 rec = eor+1;
1204         }
1205
1206         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1207         free( fbuf );
1208 }
1209
1210 /*
1211         Callback driven for each thing in a symtab. We collect the pointers to those
1212         things for later use (cloning).
1213 */
1214 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1215         thing_list_t*   tl;
1216
1217         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1218                 return;
1219         }
1220
1221         if( thing == NULL ) {
1222                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1223                 return;
1224         }
1225
1226         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1227         tl->things[tl->nused++] = thing;                // save a reference to the thing
1228
1229         if( tl->nused >= tl->nalloc ) {
1230                 extend_things( tl );                            // not enough allocated
1231         }
1232 }
1233
1234 /*
1235         Called to delete a route table entry struct. We delete the array of endpoint
1236         pointers, but NOT the endpoints referenced as those are referenced from
1237         multiple entries.
1238
1239         Route table entries can be concurrently referenced by multiple symtabs, so
1240         the actual delete happens only if decrementing the rte's ref count takes it
1241         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1242         the symtab, or overlaying an entry.
1243
1244         This function uses ONLY the pointer to the rte (thing) and ignores the other
1245         information that symtab foreach function passes (st, entry, and data) which
1246         means that it _can_ safetly be used outside of the foreach setting. If
1247         the function is changed to depend on any of these three, then a stand-alone
1248         rte_cleanup() function should be added and referenced by this, and refererences
1249         to this outside of the foreach world should be changed.
1250 */
1251 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1252         rtable_ent_t*   rte;
1253         int i;
1254
1255         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1256                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1257                 return;
1258         }
1259
1260         rte->refs--;
1261         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1262                 return;
1263         }
1264
1265         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1266                 for( i = 0; i < rte->nrrgroups; i++ ) {
1267                         if( rte->rrgroups[i] ) {
1268                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1269                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1270                         }
1271
1272                 }
1273
1274                 free( rte->rrgroups );
1275         }
1276
1277         free( rte );                                                                                    // finally, drop the potato
1278 }
1279
1280 /*
1281         Read an entire file into a buffer. We assume for route table files
1282         they will be smallish and so this won't be a problem.
1283         Returns a pointer to the buffer, or nil. Caller must free.
1284         Terminates the buffer with a nil character for string processing.
1285
1286         If we cannot stat the file, we assume it's empty or missing and return
1287         an empty buffer, as opposed to a nil, so the caller can generate defaults
1288         or error if an empty/missing file isn't tolerated.
1289 */
1290 static char* uta_fib( char const* fname ) {
1291         struct stat     stats;
1292         off_t           fsize = 8192;   // size of the file
1293         off_t           nread;                  // number of bytes read
1294         int                     fd;
1295         char*           buf;                    // input buffer
1296
1297         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1298                 if( fstat( fd, &stats ) >= 0 ) {
1299                         if( stats.st_size <= 0 ) {                                      // empty file
1300                                 close( fd );
1301                                 fd = -1;
1302                         } else {
1303                                 fsize = stats.st_size;                                          // stat ok, save the file size
1304                         }
1305                 } else {
1306                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1307                 }
1308         }
1309
1310         if( fd < 0 ) {                                                                                  // didn't open or empty
1311                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1312                         return NULL;
1313                 }
1314
1315                 *buf = 0;
1316                 return buf;
1317         }
1318
1319         // add a size limit check here
1320
1321         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1322                 close( fd );
1323                 errno = ENOMEM;
1324                 return NULL;
1325         }
1326
1327         nread = read( fd, buf, fsize );
1328         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1329                 free( buf );
1330                 errno = EFBIG;                                                                                  // likely too much to handle
1331                 close( fd );
1332                 return NULL;
1333         }
1334
1335         buf[nread] = 0;
1336
1337         close( fd );
1338         return buf;
1339 }
1340
1341 // --------------------- initialisation/creation ---------------------------------------------
1342 /*
1343         Create and initialise a route table; Returns a pointer to the table struct.
1344 */
1345 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1346         route_table_t*  rt;
1347
1348         if( ctx == NULL ) {
1349                 return NULL;
1350         }
1351         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1352                 return NULL;
1353         }
1354
1355         memset( rt, 0, sizeof( *rt ) );
1356
1357         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1358                 free( rt );
1359                 return NULL;
1360         }
1361
1362         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1363         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1364         pthread_mutex_init( rt->gate, NULL );
1365
1366         return rt;
1367 }
1368
1369 /*
1370         Clones one of the spaces in the given table.
1371         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1372         Space is the space in the old table to copy. Space 0 uses an integer key and
1373         references rte structs. All other spaces use a string key and reference endpoints.
1374 */
1375 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1376         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1377         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1378         void*   sst;                    // source symtab
1379         void*   nst;                    // new symtab
1380         thing_list_t things;    // things from the space to copy
1381         int             i;
1382         int             free_on_err = 0;
1383
1384         if( ctx == NULL ) {
1385                 return NULL;
1386         }
1387         if( nrt == NULL ) {                             // make a new table if needed
1388                 free_on_err = 1;
1389                 nrt = uta_rt_init( ctx );
1390                 if( nrt == NULL ) {
1391                         return NULL;
1392                 }
1393         }
1394
1395         if( srt == NULL ) {             // source was nil, just give back the new table
1396                 return nrt;
1397         }
1398
1399         things.nalloc = 2048;
1400         things.nused = 0;
1401         things.error = 0;
1402         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1403         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1404         if( things.things == NULL || things.names == NULL ){
1405                 if( things.things != NULL ) {
1406                         free( things.things );
1407                 }
1408                 if( things.names != NULL ) {
1409                         free( things.names );
1410                 }
1411
1412                 if( free_on_err ) {
1413                         rmr_sym_free( nrt->hash );
1414                         free( nrt );
1415                         nrt = NULL;
1416                 } else {
1417                         nrt->error = 1;
1418                 }
1419
1420                 return nrt;
1421         }
1422         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1423         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1424
1425         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1426         nst = nrt->hash;
1427
1428         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1429         if( things.error ) {                            // something happened and capture failed
1430                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1431                 free( things.things );
1432                 free( things.names );
1433                 if( free_on_err ) {
1434                         rmr_sym_free( nrt->hash );
1435                         free( nrt );
1436                         nrt = NULL;
1437                 } else {
1438                         nrt->error = 1;
1439                 }
1440                 return nrt;
1441         }
1442
1443         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1444         for( i = 0; i < things.nused; i++ ) {
1445                 if( space ) {                                                                                           // string key, epoint reference
1446                         ep = (endpoint_t *) things.things[i];
1447                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1448                 } else {
1449                         rte = (rtable_ent_t *) things.things[i];
1450                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1451                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1452                 }
1453         }
1454
1455         free( things.things );
1456         free( (void *) things.names );
1457         return nrt;
1458 }
1459
1460 /*
1461         Given a destination route table (drt), clone from the source (srt) into it.
1462         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1463         allocate the drt if that was nil too). If all is true (1), then we will clone both
1464         the MT and the ME spaces; otherwise only the ME space is cloned.
1465 */
1466 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1467         endpoint_t*             ep;                             // an endpoint
1468         rtable_ent_t*   rte;                    // a route table entry
1469         int i;
1470
1471         if( ctx == NULL ) {
1472                 return NULL;
1473         }
1474         if( drt == NULL ) {
1475                 drt = uta_rt_init( ctx );
1476         }
1477         if( srt == NULL ) {
1478                 return drt;
1479         }
1480
1481         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1482         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1483         if( all ) {
1484                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1485         }
1486
1487         return drt;
1488 }
1489
1490 /*
1491         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1492         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1493         context to be referenced by the new pointer; the old pointer is set to nil.
1494
1495         If the old table doesn't exist, then a new table is created and the new pointer is
1496         set to reference it.
1497
1498         The ME namespace references endpoints which do not need to be released, therefore we
1499         do not need to run that portion of the table to deref like we do for the RTEs.
1500 */
1501 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1502         int counter = 0;
1503         route_table_t*  rt;
1504
1505         if( ctx == NULL ) {
1506                 return NULL;
1507         }
1508
1509         if( (rt = ctx->old_rtable) != NULL ) {
1510                 ctx->old_rtable = NULL;
1511                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1512                         if( counter++ > 1000 ) {
1513                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1514                                 break;
1515                         }
1516
1517                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1518                 }
1519
1520                 if( rt->hash != NULL ) {
1521                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1522                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1523                 }
1524
1525                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1526         } else {
1527                 rt = NULL;
1528         }
1529
1530         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1531         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1532                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1533         } else {
1534                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1535                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1536                 rt->error = 1;
1537         }
1538
1539         return rt;
1540 }
1541
1542
1543 /*
1544         Given a name, find the endpoint struct in the provided route table.
1545 */
1546 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1547
1548         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1549                 return NULL;
1550         }
1551
1552         return rmr_sym_get( rt->ephash, ep_name, 1 );
1553 }
1554
1555 /*
1556         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1557         Does NOT destroy the gate as it's a common gate for ALL route tables.
1558 */
1559 static void uta_rt_drop( route_table_t* rt ) {
1560         if( rt == NULL ) {
1561                 return;
1562         }
1563
1564         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1565         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1566         free( rt );
1567 }
1568
1569 /*
1570         Look up and return the pointer to the endpoint stuct matching the given name.
1571         If not in the hash, a new endpoint is created, added to the hash. Should always
1572         return a pointer.
1573 */
1574 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1575         endpoint_t*     ep;
1576
1577         if( !rt || !ep_name || ! *ep_name ) {
1578                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1579                 errno = EINVAL;
1580                 return NULL;
1581         }
1582
1583         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1584                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1585                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1586                         errno = ENOMEM;
1587                         return NULL;
1588                 }
1589
1590                 ep->notify = 1;                                                         // show notification on first connection failure
1591                 ep->open = 0;                                                           // not connected
1592                 ep->addr = uta_h2ip( ep_name );
1593                 ep->name = strdup( ep_name );
1594                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1595                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1596
1597                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1598         }
1599
1600         return ep;
1601 }
1602
1603
1604 /*
1605         Given a session id and message type build a key that can be used to look up the rte in the route
1606         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1607 */
1608 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1609         uint64_t key;
1610
1611         if( sub_id == UNSET_SUBID ) {
1612                 key = 0xffffffff00000000 | mtype;
1613         } else {
1614                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1615         }
1616
1617         return key;
1618 }
1619
1620 /*
1621         Given a route table and meid string, find the owner (if known). Returns a pointer to
1622         the endpoint struct or nil.
1623 */
1624 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1625         endpoint_t const* ep;           // the ep we found in the hash
1626
1627         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1628                 return NULL;
1629         }
1630
1631         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1632 }
1633
1634 /*
1635         This returns a pointer to the currently active route table and ups
1636         the reference count so that the route table is not freed while it
1637         is being used. The caller MUST call release_rt() when finished
1638         with the pointer.
1639
1640         Care must be taken: the ctx->rtable pointer _could_ change during the time
1641         between the release of the lock and the return. Therefore we MUST grab
1642         the current pointer when we have the lock so that if it does we don't
1643         return a pointer to the wrong table.
1644
1645         This will return NULL if there is no active table.
1646 */
1647 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1648         route_table_t*  rrt;                    // return value
1649
1650         if( ctx == NULL || ctx->rtable == NULL ) {
1651                 return NULL;
1652         }
1653
1654         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1655         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1656         rrt->ref_count++;
1657         pthread_mutex_unlock( ctx->rtgate );
1658
1659         return rrt;                                                                             // pointer we upped the count with
1660 }
1661
1662 /*
1663         This will "release" the route table by reducing the use counter
1664         in the table. The table may not be freed until the counter reaches
1665         0, so it's imparative that the pointer be "released" when it is
1666         fetched by get_rt().  Once the caller has released the table it
1667         may not safely use the pointer that it had.
1668 */
1669 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1670         if( ctx == NULL || rt == NULL ) {
1671                 return;
1672         }
1673
1674         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1675         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1676                 rt->ref_count--;
1677         }
1678         pthread_mutex_unlock( ctx->rtgate );
1679 }
1680 #endif