Fix route table clone core dump
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int error;                                      // if a realloc failed, this will be set
61         int nalloc;
62         int nused;
63         void** things;
64         const char** names;
65 } thing_list_t;
66
67 // ---- debugging/testing -------------------------------------------------------------------------
68
69 /*
70         Dump some stats for an endpoint in the RT. This is generally called to
71         verify endpoints after a table load/change.
72
73         This is called by the for-each mechanism of the symtab and the prototype is
74         fixe; we don't really use some of the parms, but have dummy references to
75         keep sonar from complaining.
76 */
77 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
78         int*    counter;
79         endpoint_t* ep;
80
81         if( (ep = (endpoint_t *) thing) == NULL ) {
82                 return;
83         }
84
85         if( (counter = (int *) vcounter) != NULL ) {
86                 (*counter)++;
87         } else {
88                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
89                 return;
90         }
91
92         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
93 }
94
95 /*
96         Called to count meid entries in the table. The meid points to an 'owning' endpoint
97         so we can list what we find
98
99         See note in ep_stats about dummy refs.
100 */
101 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
102         int*    counter;
103         endpoint_t* ep;
104
105         if( (ep = (endpoint_t *) thing) == NULL ) {
106                 return;
107         }
108
109         if( (counter = (int *) vcounter) != NULL ) {
110                 (*counter)++;
111         } else {
112                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
113         }
114
115         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
116 }
117
118 /*
119         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
120         the 'source' information and is added to each message.
121
122         See note above about dummy references.
123 */
124 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
125         endpoint_t* ep;
126         char*   id;
127
128         if( (ep = (endpoint_t *) thing) == NULL ) {
129                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
130                 return;
131         }
132
133         if( (id = (char *) vid) == NULL ) {
134                 id = "missing";
135         }
136
137         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
138                 (long long) time( NULL ),
139                 id,
140                 ep->name,
141                 ep->open,
142                 ep->scounts[EPSC_GOOD],
143                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
144                 ep->scounts[EPSC_FAIL],
145                 ep->scounts[EPSC_TRANS]   );
146 }
147
148 /*
149         Dump stats for a route entry in the table.
150 */
151 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
152         int*    counter;
153         rtable_ent_t const* rte;                // thing is really an rte
154         int             mtype;
155         int             sid;
156
157         if( (rte = (rtable_ent_t *) thing) == NULL ) {
158                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
159                 return;
160         }
161
162         if( (counter = (int *) vcounter) != NULL ) {
163                 (*counter)++;
164         }
165
166         mtype = rte->key & 0xffff;
167         sid = (int) (rte->key >> 32);
168
169         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
170 }
171
172 /*
173         Given a route table, cause some stats to be spit out.
174 */
175 static void  rt_stats( route_table_t* rt ) {
176         int* counter;
177
178         if( rt == NULL ) {
179                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
180                 return;
181         }
182
183         counter = (int *) malloc( sizeof( int ) );
184         *counter = 0;
185         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
186         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
187         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
188         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
189
190         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
191         *counter = 0;
192         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
193         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
194
195         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
196         *counter = 0;
197         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
198         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
199
200         free( counter );
201 }
202
203 /*
204         Given a route table, cause endpoint counters to be written to stderr. The id
205         parm is written as the "source" in the output.
206 */
207 static void  rt_epcounts( route_table_t* rt, char* id ) {
208         if( rt == NULL ) {
209                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
210                 return;
211         }
212
213         rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
214 }
215
216
217 static void dump_tables( uta_ctx_t *ctx ) {
218         if( ctx->old_rtable != NULL ) {
219                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
220                 rt_stats( ctx->old_rtable );
221         } else {
222                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
223         }
224         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
225         rt_stats( ctx->rtable );
226 }
227
228 // ------------ route manager communication -------------------------------------------------
229 /*
230         Send a request for a table update to the route manager. Updates come in
231         async, so send and go.
232
233         pctx is the private context for the thread; ctx is the application context
234         that we need to be able to send the application ID in case rt mgr needs to
235         use it to idenfity us.
236
237         Returns 0 if we were not able to send a request.
238 */
239 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
240         rmr_mbuf_t*     smsg;
241         int     state = 0;
242
243         if( ctx->rtg_whid < 0 ) {
244                 return state;
245         }
246
247         smsg = rmr_alloc_msg( pctx, 1024 );
248         if( smsg != NULL ) {
249                 smsg->mtype = RMRRM_REQ_TABLE;
250                 smsg->sub_id = 0;
251                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
252                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
253                 smsg->len = strlen( smsg->payload ) + 1;
254
255                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
256                 if( (state = smsg->state) != RMR_OK ) {
257                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
258                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
259                         ctx->rtg_whid = -1;
260                 }
261
262                 rmr_free_msg( smsg );
263         }
264
265         return state;
266 }
267
268 /*
269         Send an ack to the route table manager for a table ID that we are
270         processing.      State is 1 for OK, and 0 for failed. Reason might
271         be populated if we know why there was a failure.
272
273         Context should be the PRIVATE context that we use for messages
274         to route manger and NOT the user's context.
275
276         If a message buffere is passed we use that and use return to sender
277         assuming that this might be a response to a call and that is needed
278         to send back to the proper calling thread. If msg is nil, we allocate
279         and use it.
280 */
281 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
282         int             use_rts = 1;
283         int             payload_size = 1024;
284
285         if( ctx == NULL || ctx->rtg_whid < 0 ) {
286                 return;
287         }
288
289         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
290                 return;
291         }
292
293         if( smsg != NULL ) {
294                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
295         } else {
296                 use_rts = 0;
297                 smsg = rmr_alloc_msg( ctx, payload_size );
298         }
299
300         if( smsg != NULL ) {
301                 smsg->mtype = RMRRM_TABLE_STATE;
302                 smsg->sub_id = -1;
303                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
304                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
305
306                 smsg->len = strlen( smsg->payload ) + 1;
307
308                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
309                 if( use_rts ) {
310                         smsg = rmr_rts_msg( ctx, smsg );
311                 } else {
312                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
313                 }
314                 if( (state = smsg->state) != RMR_OK ) {
315                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
316                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
317                         ctx->rtg_whid = -1;
318                 }
319
320                 if( ! use_rts ) {
321                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
322                 }
323         }
324 }
325
326 // ---- utility -----------------------------------------------------------------------------------
327 /*
328         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
329         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
330 */
331 static char* clip( char* buf ) {
332         char*   tok;
333
334         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
335                 buf++;
336         }
337
338         if( (tok = strchr( buf, '#' )) != NULL ) {
339                 if( tok == buf ) {
340                         return buf;                                     // just push back; leading comment sym handled there
341                 }
342
343                 if( isspace( *(tok-1) ) ) {
344                         *tok = 0;
345                 }
346         }
347
348         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
349         *(tok+1) = 0;
350
351         return buf;
352 }
353
354 /*
355         This accepts a pointer to a nil terminated string, and ensures that there is a
356         newline as the last character. If there is not, a new buffer is allocated and
357         the newline is added.  If a new buffer is allocated, the buffer passed in is
358         freed.  The function returns a pointer which the caller should use, and must
359         free.  In the event of an error, a nil pointer is returned.
360 */
361 static char* ensure_nlterm( char* buf ) {
362         char*   nb = NULL;
363         int             len = 0;
364
365         if( buf != NULL ) {
366                 len = strlen( buf );
367         }
368
369         nb = buf;                                                       // default to returning original as is
370         switch( len ) {
371                 case 0:
372                         nb = strdup( "\n" );
373                         break;
374
375                 case 1:
376                         if( *buf != '\n' ) {            // not a newline; realloc
377                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
378                                 nb = strdup( " \n" );
379                                 *nb = *buf;
380                                 free( buf );
381                         }
382                         break;
383
384                 default:
385                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
386                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
387                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
388                                         memcpy( nb, buf, len );
389                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
390                                         *(nb+len+1) = 0;
391                                         free( buf );
392                                 }
393                         }
394                         break;
395         }
396
397         return nb;
398 }
399
400 /*
401         Roll the new table into the active and the active into the old table. We
402         must have the lock on the active table to do this. It's possible that there
403         is no active table (first load), so we have to account for that (no locking).
404 */
405 static void roll_tables( uta_ctx_t* ctx ) {
406
407         if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
408                 rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
409                 ctx->old_rtable = ctx->new_rtable;
410                 ctx->new_rtable = NULL;
411                 return;
412         }
413
414         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
415                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
416                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
417                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
418                 pthread_mutex_unlock( ctx->rtgate );
419         } else {
420                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
421                 ctx->rtable = ctx->new_rtable;                          // make new the active one
422         }
423
424         ctx->new_rtable = NULL;
425 }
426
427 /*
428         Given a thing list, extend the array of pointers by 1/2 of the current
429         number allocated. If we cannot realloc an array, then we set the error
430         flag.  Unlikely, but will prevent a crash, AND will prevent us from
431         trying to use the table since we couldn't capture everything.
432 */
433 static void extend_things( thing_list_t* tl ) {
434         int old_alloc;
435         void*   old_things;
436         void*   old_names;
437
438         if( tl == NULL ) {
439                 return;
440         }
441
442         old_alloc = tl->nalloc;                         // capture current things
443         old_things = tl->things;
444         old_names = tl->names;
445
446         tl->nalloc += tl->nalloc/2;                     // new allocation size
447
448         tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
449         tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
450
451         if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
452                 tl->error = 1;
453                 return;
454         }
455
456         memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
457         memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
458
459         free( old_things );
460         free( old_names );
461 }
462
463 // ------------ entry update functions ---------------------------------------------------------------
464 /*
465         Given a message type create a route table entry and add to the hash keyed on the
466         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
467         is the number of group slots to allocate in the entry.
468 */
469 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
470         rtable_ent_t* rte;
471         rtable_ent_t* old_rte;          // entry which was already in the table for the key
472
473         if( rt == NULL ) {
474                 return NULL;
475         }
476
477         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
478                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
479                 return NULL;
480         }
481         memset( rte, 0, sizeof( *rte ) );
482         rte->refs = 1;
483         rte->key = key;
484
485         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
486                 nrrgroups = 10;
487         }
488
489         if( nrrgroups ) {
490                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
491                         free( rte );
492                         return NULL;
493                 }
494                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
495         } else {
496                 rte->rrgroups = NULL;
497         }
498
499         rte->nrrgroups = nrrgroups;
500
501         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
502                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
503         }
504
505         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
506
507         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
508         return rte;
509 }
510
511 /*
512         This accepts partially parsed information from an rte or mse record sent by route manager or read from
513         a file such that:
514                 ts_field is the msg-type,sender field
515                 subid is the integer subscription id
516                 rr_field is the endpoint information for round robening message over
517
518         If all goes well, this will add an RTE to the table under construction.
519
520         The ts_field is checked to see if we should ingest this record. We ingest if one of
521         these is true:
522                 there is no sender info (a generic entry for all)
523                 there is sender and our host:port matches one of the senders
524                 the sender info is an IP address that matches one of our IP addresses
525 */
526 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
527         rtable_ent_t*   rte;            // route table entry added
528         char const*     tok;
529         int             ntoks;
530         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
531         char*   tokens[128];
532         char*   gtokens[64];
533         int             i;
534         int             ngtoks;                         // number of tokens in the group list
535         int             grp;                            // index into group list
536         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
537         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
538
539         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
540         rr_field = clip( rr_field );
541
542         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
543                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
544                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
545
546                 key = build_rt_key( subid, atoi( ts_field ) );
547
548                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
549
550                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
551                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
552                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
553                         }
554                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
555                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
556
557                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
558                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
559                                         for( i = 0; i < ntoks; i++ ) {
560                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
561                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
562                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
563                                                         has_ep = TRUE;
564                                                 }
565                                         }
566                                         if( has_ep ) {
567                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
568                                                 has_ep = FALSE;
569                                         }
570                                 }
571                         }
572                 }
573         } else {
574                 if( DEBUG || (vlevel > 2) ) {
575                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
576                 }
577         }
578 }
579
580 /*
581         Trash_entry takes a partially parsed record from the input and
582         will delete the entry if the sender,mtype matches us or it's a
583         generic mtype. The refernce in the new table is removed and the
584         refcounter for the actual rte is decreased. If that ref count is
585         0 then the memory is freed (handled byh the del_rte call).
586 */
587 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
588         rtable_ent_t*   rte;            // route table entry to be 'deleted'
589         char const*     tok;
590         int             ntoks;
591         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
592         char*   tokens[128];
593
594         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
595                 return;
596         }
597
598         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
599
600         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
601                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
602                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
603
604                 key = build_rt_key( subid, atoi( ts_field ) );
605                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
606                 if( rte != NULL ) {
607                         if( DEBUG || (vlevel > 1) ) {
608                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
609                         }
610                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
611                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
612                 } else {
613                         if( DEBUG || (vlevel > 1) ) {
614                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
615                         }
616                 }
617         } else {
618                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
619         }
620 }
621
622 // -------------------------- parse functions --------------------------------------------------
623
624 /*
625         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
626         the 'owner' which should be the dns name or IP address of an enpoint
627         the meid_list is a space separated list of me IDs
628
629         This function assumes the caller has vetted the pointers as needed.
630
631         For each meid in the list, an entry is pushed into the hash which references the owner
632         endpoint such that when the meid is used to route a message it references the endpoint
633         to send messages to.
634 */
635 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
636         char const*     tok;
637         int             ntoks;
638         char*   tokens[128];
639         int             i;
640         int             state;
641         endpoint_t*     ep;                                             // endpoint struct for the owner
642
643         owner = clip( owner );                          // ditch extra whitespace and trailing comments
644         meid_list = clip( meid_list );
645
646         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
647         for( i = 0; i < ntoks; i++ ) {
648                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
649                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
650                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
651                 } else {
652                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
653                 }
654         }
655 }
656
657 /*
658         Given the tokens from an mme_del, delete the listed meid entries from the new
659         table. The list is a space separated list of meids.
660
661         The meids in the hash reference endpoints which are never deleted and so
662         the only thing that we need to do here is to remove the meid from the hash.
663
664         This function assumes the caller has vetted the pointers as needed.
665 */
666 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
667         char const*     tok;
668         int             ntoks;
669         char*   tokens[128];
670         int             i;
671
672         if( rtab->hash == NULL ) {
673                 return;
674         }
675
676         meid_list = clip( meid_list );
677
678         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
679         for( i = 0; i < ntoks; i++ ) {
680                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
681                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
682         }
683 }
684
685 /*
686         Parse a partially parsed meid record. Tokens[0] should be one of:
687                 meid_map, mme_ar, mme_del.
688
689         pctx is the private context needed to return an ack/nack using the provided
690         message buffer with the route managers address info.
691 */
692 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
693         char wbuf[1024];
694
695         if( tokens == NULL || ntoks < 1 ) {
696                 return;                                                 // silent but should never happen
697         }
698
699         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
700                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
701                 return;
702         }
703
704         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
705                 tokens[1] = clip( tokens[1] );
706                 if( *(tokens[1]) == 's' ) {
707                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
708                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
709                                 uta_rt_drop( ctx->new_rtable );
710                                 ctx->new_rtable = NULL;
711                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
712                         }
713
714                         if( ctx->table_id != NULL ) {
715                                 free( ctx->table_id );
716                         }
717                         if( ntoks > 2 ) {
718                                 ctx->table_id = strdup( clip( tokens[2] ) );
719                         } else {
720                                 ctx->table_id = NULL;
721                         }
722
723                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
724                         ctx->new_rtable->mupdates = 0;
725
726                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
727                 } else {
728                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
729                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
730                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
731                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
732                                                                 ctx->new_rtable->mupdates, tokens[2] );
733                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
734                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
735                                                 uta_rt_drop( ctx->new_rtable );
736                                                 ctx->new_rtable = NULL;
737                                                 return;
738                                         }
739
740                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
741                                 }
742
743                                 if( ctx->new_rtable ) {
744                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
745                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
746                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
747
748                                         if( vlevel > 0 ) {
749                                                 if( ctx->old_rtable != NULL ) {
750                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
751                                                         rt_stats( ctx->old_rtable );
752                                                 } else {
753                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
754                                                 }
755                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
756                                                 rt_stats( ctx->rtable );
757                                         }
758                                 } else {
759                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
760                                         ctx->new_rtable = NULL;
761                                 }
762                         }
763                 }
764
765                 return;
766         }
767
768         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
769                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
770                 return;
771         }
772
773         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
774                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
775                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
776                         return;
777                 }
778                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
779                 ctx->new_rtable->mupdates++;
780                 return;
781         }
782
783         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
784                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
785                 ctx->new_rtable->mupdates++;
786                 return;
787         }
788 }
789
790 /*
791         Parse a single record recevied from the route table generator, or read
792         from a static route table file.  Start records cause a new table to
793         be started (if a partial table was received it is discarded. Table
794         entry records are added to the currenly 'in progress' table, and an
795         end record causes the in progress table to be finalised and the
796         currently active table is replaced.
797
798         The updated table will be activated when the *|end record is encountered.
799         However, to allow for a "double" update, where both the meid map and the
800         route table must be updated at the same time, the end indication on a
801         route table (new or update) may specifiy "hold" which indicates that meid
802         map entries are to follow and the updated route table should be held as
803         pending until the end of the meid map is received and validated.
804
805         CAUTION:  we are assuming that there is a single route/meid map generator
806                 and as such only one type of update is received at a time; in other
807                 words, the sender cannot mix update records and if there is more than
808                 one sender process they must synchronise to avoid issues.
809
810
811         For a RT update, we expect:
812                 newrt | start | <table-id>
813                 newrt | end | <count>
814                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
815                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
816                 mse| <mtype>[,sender] | <sub-id> | %meid
817
818
819         For a meid map update we expect:
820                 meid_map | start | <table-id>
821                 meid_map | end | <count> | <md5-hash>
822                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
823                 mme_del | <meid0> <meid1>...<meidn>
824
825
826         The pctx is our private context that must be used to send acks/status
827         messages back to the route manager.  The regular ctx is the ctx that
828         the user has been given and thus that's where we have to hang the route
829         table we're working with.
830
831         If mbuf is given, and we need to ack, then we ack using the mbuf and a
832         return to sender call (allows route manager to use wh_call() to send
833         an update and rts is required to get that back to the right thread).
834         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
835         will be used.
836 */
837 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
838         int i;
839         int ntoks;                                                      // number of tokens found in something
840         int ngtoks;
841         int     grp;                                                    // group number
842         rtable_ent_t const*     rte;                    // route table entry added
843         char*   tokens[128];
844         char*   tok;                                            // pointer into a token or string
845         char    wbuf[1024];
846
847         if( ! buf ) {
848                 return;
849         }
850
851         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
852                 buf++;
853         }
854         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
855         *(tok+1) = 0;
856
857         memset( tokens, 0, sizeof( tokens ) );
858         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
859                 tokens[0] = clip( tokens[0] );
860                 switch( *(tokens[0]) ) {
861                         case 0:                                                                                                 // ignore blanks
862                                 // fallthrough
863                         case '#':                                                                                               // and comment lines
864                                 break;
865
866                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
867                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
868                                         break;
869                                 }
870
871                                 if( ntoks < 3 ) {
872                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
873                                         break;
874                                 }
875
876                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
877                                 ctx->new_rtable->updates++;
878                                 break;
879
880                         case 'n':                                                                                               // newrt|{start|end}
881                                 tokens[1] = clip( tokens[1] );
882                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
883                                         if( ntoks >2 ) {
884                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
885                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
886                                                                 ctx->new_rtable->updates, tokens[2] );
887                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
888                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
889                                                         uta_rt_drop( ctx->new_rtable );
890                                                         ctx->new_rtable = NULL;
891                                                         break;
892                                                 }
893                                         }
894
895                                         if( ctx->new_rtable ) {
896                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
897                                                 if( DEBUG > 1 || (vlevel > 1) ) {
898                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
899                                                         dump_tables( ctx );
900                                                 }
901
902                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
903                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
904                                         } else {
905                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
906                                                 ctx->new_rtable = NULL;
907                                         }
908                                 } else {                                                                                                                        // start a new table.
909                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
910                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
911
912                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
913                                                 uta_rt_drop( ctx->new_rtable );
914                                                 ctx->new_rtable = NULL;
915                                         }
916
917                                         if( ctx->table_id != NULL ) {
918                                                 free( ctx->table_id );
919                                         }
920                                         if( ntoks >2 ) {
921                                                 ctx->table_id = strdup( clip( tokens[2] ) );
922                                         } else {
923                                                 ctx->table_id = NULL;
924                                         }
925
926                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
927                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
928
929                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
930                                 }
931                                 break;
932
933                         case 'm':                                                                       // mse entry or one of the meid_ records
934                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
935                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
936                                                 break;
937                                         }
938
939                                         if( ntoks < 4 ) {
940                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
941                                                 break;
942                                         }
943
944                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
945                                         ctx->new_rtable->updates++;
946                                 } else {
947                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
948                                 }
949                                 break;
950
951                         case 'r':                                       // assume rt entry
952                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
953                                         break;
954                                 }
955
956                                 ctx->new_rtable->updates++;
957                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
958                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
959                                 } else {
960                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
961                                 }
962                                 break;
963
964                         case 'u':                                                                                               // update current table, not a total replacement
965                                 tokens[1] = clip( tokens[1] );
966                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
967                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
968                                                 break;
969                                         }
970
971                                         if( ntoks >2 ) {
972                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
973                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
974                                                                 ctx->new_rtable->updates, tokens[2] );
975                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
976                                                         uta_rt_drop( ctx->new_rtable );
977                                                         ctx->new_rtable = NULL;
978                                                         break;
979                                                 }
980                                         }
981
982                                         if( ctx->new_rtable ) {
983                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
984                                                 if( DEBUG > 1 || (vlevel > 1) )  {
985                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
986                                                         dump_tables( ctx );
987                                                 }
988
989                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
990                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
991                                         } else {
992                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
993                                                 ctx->new_rtable = NULL;
994                                         }
995                                 } else {                                                                                        // start a new table.
996                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
997                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
998                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
999                                                 uta_rt_drop( ctx->new_rtable );
1000                                                 ctx->new_rtable = NULL;
1001                                         }
1002
1003                                         if( ntoks > 2 ) {
1004                                                 if( ctx->table_id != NULL ) {
1005                                                         free( ctx->table_id );
1006                                                 }
1007                                                 ctx->table_id = strdup( clip( tokens[2] ) );
1008                                         }
1009
1010                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
1011                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
1012
1013                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
1014                                 }
1015                                 break;
1016
1017                         default:
1018                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
1019                                 break;
1020                 }
1021         }
1022 }
1023
1024 /*
1025         This function attempts to open a static route table in order to create a 'seed'
1026         table during initialisation.  The environment variable RMR_SEED_RT is expected
1027         to contain the necessary path to the file. If missing, or if the file is empty,
1028         no route table will be available until one is received from the generator.
1029
1030         This function is probably most useful for testing situations, or extreme
1031         cases where the routes are static.
1032 */
1033 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
1034         int             i;
1035         char*   fname;
1036         char*   fbuf;                           // buffer with file contents
1037         char*   rec;                            // start of the record
1038         char*   eor;                            // end of the record
1039         int             rcount = 0;                     // record count for debug
1040
1041         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
1042                 return;
1043         }
1044
1045         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1046                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1047                 return;
1048         }
1049
1050         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1051         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1052                 if( *eor == '\r' ) {
1053                         *eor = '\n';                                                            // will look like a blank line which is ok
1054                 }
1055         }
1056
1057         rec = fbuf;
1058         while( rec && *rec ) {
1059                 rcount++;
1060                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1061                         *eor = 0;
1062                 } else {
1063                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1064                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1065                         free( fbuf );
1066                         return;
1067                 }
1068
1069                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1070
1071                 rec = eor+1;
1072         }
1073
1074         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1075         free( fbuf );
1076 }
1077
1078 /*
1079         Callback driven for each thing in a symtab. We collect the pointers to those
1080         things for later use (cloning).
1081 */
1082 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1083         thing_list_t*   tl;
1084
1085         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1086                 return;
1087         }
1088
1089         if( thing == NULL ) {
1090                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1091                 return;
1092         }
1093
1094         tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
1095         tl->things[tl->nused++] = thing;                // save a reference to the thing
1096
1097         if( tl->nused >= tl->nalloc ) {
1098                 extend_things( tl );                            // not enough allocated
1099         }
1100 }
1101
1102 /*
1103         Called to delete a route table entry struct. We delete the array of endpoint
1104         pointers, but NOT the endpoints referenced as those are referenced from
1105         multiple entries.
1106
1107         Route table entries can be concurrently referenced by multiple symtabs, so
1108         the actual delete happens only if decrementing the rte's ref count takes it
1109         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1110         the symtab, or overlaying an entry.
1111
1112         This function uses ONLY the pointer to the rte (thing) and ignores the other
1113         information that symtab foreach function passes (st, entry, and data) which
1114         means that it _can_ safetly be used outside of the foreach setting. If
1115         the function is changed to depend on any of these three, then a stand-alone
1116         rte_cleanup() function should be added and referenced by this, and refererences
1117         to this outside of the foreach world should be changed.
1118 */
1119 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1120         rtable_ent_t*   rte;
1121         int i;
1122
1123         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1124                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1125                 return;
1126         }
1127
1128         rte->refs--;
1129         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1130                 return;
1131         }
1132
1133         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1134                 for( i = 0; i < rte->nrrgroups; i++ ) {
1135                         if( rte->rrgroups[i] ) {
1136                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1137                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1138                         }
1139
1140                 }
1141
1142                 free( rte->rrgroups );
1143         }
1144
1145         free( rte );                                                                                    // finally, drop the potato
1146 }
1147
1148 /*
1149         Read an entire file into a buffer. We assume for route table files
1150         they will be smallish and so this won't be a problem.
1151         Returns a pointer to the buffer, or nil. Caller must free.
1152         Terminates the buffer with a nil character for string processing.
1153
1154         If we cannot stat the file, we assume it's empty or missing and return
1155         an empty buffer, as opposed to a nil, so the caller can generate defaults
1156         or error if an empty/missing file isn't tolerated.
1157 */
1158 static char* uta_fib( char const* fname ) {
1159         struct stat     stats;
1160         off_t           fsize = 8192;   // size of the file
1161         off_t           nread;                  // number of bytes read
1162         int                     fd;
1163         char*           buf;                    // input buffer
1164
1165         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1166                 if( fstat( fd, &stats ) >= 0 ) {
1167                         if( stats.st_size <= 0 ) {                                      // empty file
1168                                 close( fd );
1169                                 fd = -1;
1170                         } else {
1171                                 fsize = stats.st_size;                                          // stat ok, save the file size
1172                         }
1173                 } else {
1174                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1175                 }
1176         }
1177
1178         if( fd < 0 ) {                                                                                  // didn't open or empty
1179                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1180                         return NULL;
1181                 }
1182
1183                 *buf = 0;
1184                 return buf;
1185         }
1186
1187         // add a size limit check here
1188
1189         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1190                 close( fd );
1191                 errno = ENOMEM;
1192                 return NULL;
1193         }
1194
1195         nread = read( fd, buf, fsize );
1196         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1197                 free( buf );
1198                 errno = EFBIG;                                                                                  // likely too much to handle
1199                 close( fd );
1200                 return NULL;
1201         }
1202
1203         buf[nread] = 0;
1204
1205         close( fd );
1206         return buf;
1207 }
1208
1209 // --------------------- initialisation/creation ---------------------------------------------
1210 /*
1211         Create and initialise a route table; Returns a pointer to the table struct.
1212 */
1213 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1214         route_table_t*  rt;
1215
1216         if( ctx == NULL ) {
1217                 return NULL;
1218         }
1219         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1220                 return NULL;
1221         }
1222
1223         memset( rt, 0, sizeof( *rt ) );
1224
1225         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1226                 free( rt );
1227                 return NULL;
1228         }
1229
1230         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1231         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1232         pthread_mutex_init( rt->gate, NULL );
1233
1234         return rt;
1235 }
1236
1237 /*
1238         Clones one of the spaces in the given table.
1239         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1240         Space is the space in the old table to copy. Space 0 uses an integer key and
1241         references rte structs. All other spaces use a string key and reference endpoints.
1242 */
1243 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1244         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1245         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1246         void*   sst;                    // source symtab
1247         void*   nst;                    // new symtab
1248         thing_list_t things;    // things from the space to copy
1249         int             i;
1250         int             free_on_err = 0;
1251
1252         if( ctx == NULL ) {
1253                 return NULL;
1254         }
1255         if( nrt == NULL ) {                             // make a new table if needed
1256                 free_on_err = 1;
1257                 nrt = uta_rt_init( ctx );
1258                 if( nrt == NULL ) {
1259                         return NULL;
1260                 }
1261         }
1262
1263         if( srt == NULL ) {             // source was nil, just give back the new table
1264                 return nrt;
1265         }
1266
1267         things.nalloc = 2048;
1268         things.nused = 0;
1269         things.error = 0;
1270         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1271         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1272         if( things.things == NULL || things.names == NULL ){
1273                 if( things.things != NULL ) {
1274                         free( things.things );
1275                 }
1276                 if( things.names != NULL ) {
1277                         free( things.names );
1278                 }
1279
1280                 if( free_on_err ) {
1281                         rmr_sym_free( nrt->hash );
1282                         free( nrt );
1283                         nrt = NULL;
1284                 } else {
1285                         nrt->error = 1;
1286                 }
1287
1288                 return nrt;
1289         }
1290         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1291         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1292
1293         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1294         nst = nrt->hash;
1295
1296         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1297         if( things.error ) {                            // something happened and capture failed
1298                 rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
1299                 free( things.things );
1300                 free( things.names );
1301                 if( free_on_err ) {
1302                         rmr_sym_free( nrt->hash );
1303                         free( nrt );
1304                         nrt = NULL;
1305                 } else {
1306                         nrt->error = 1;
1307                 }
1308                 return nrt;
1309         }
1310
1311         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1312         for( i = 0; i < things.nused; i++ ) {
1313                 if( space ) {                                                                                           // string key, epoint reference
1314                         ep = (endpoint_t *) things.things[i];
1315                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1316                 } else {
1317                         rte = (rtable_ent_t *) things.things[i];
1318                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1319                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1320                 }
1321         }
1322
1323         free( things.things );
1324         free( (void *) things.names );
1325         return nrt;
1326 }
1327
1328 /*
1329         Given a destination route table (drt), clone from the source (srt) into it.
1330         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1331         allocate the drt if that was nil too). If all is true (1), then we will clone both
1332         the MT and the ME spaces; otherwise only the ME space is cloned.
1333 */
1334 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1335         endpoint_t*             ep;                             // an endpoint
1336         rtable_ent_t*   rte;                    // a route table entry
1337         int i;
1338
1339         if( ctx == NULL ) {
1340                 return NULL;
1341         }
1342         if( drt == NULL ) {
1343                 drt = uta_rt_init( ctx );
1344         }
1345         if( srt == NULL ) {
1346                 return drt;
1347         }
1348
1349         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1350         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1351         if( all ) {
1352                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1353         }
1354
1355         return drt;
1356 }
1357
1358 /*
1359         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1360         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1361         context to be referenced by the new pointer; the old pointer is set to nil.
1362
1363         If the old table doesn't exist, then a new table is created and the new pointer is
1364         set to reference it.
1365
1366         The ME namespace references endpoints which do not need to be released, therefore we
1367         do not need to run that portion of the table to deref like we do for the RTEs.
1368 */
1369 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1370         int counter = 0;
1371         route_table_t*  rt;
1372
1373         if( ctx == NULL ) {
1374                 return NULL;
1375         }
1376
1377         if( (rt = ctx->old_rtable) != NULL ) {
1378                 ctx->old_rtable = NULL;
1379                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1380                         if( counter++ > 1000 ) {
1381                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1382                                 break;
1383                         }
1384
1385                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1386                 }
1387
1388                 if( rt->hash != NULL ) {
1389                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1390                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1391                 }
1392
1393                 rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
1394         } else {
1395                 rt = NULL;
1396         }
1397
1398         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1399         if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
1400                 rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
1401         } else {
1402                 rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
1403                 rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
1404                 rt->error = 1;
1405         }
1406
1407         return rt;
1408 }
1409
1410
1411 /*
1412         Given a name, find the endpoint struct in the provided route table.
1413 */
1414 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1415
1416         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1417                 return NULL;
1418         }
1419
1420         return rmr_sym_get( rt->ephash, ep_name, 1 );
1421 }
1422
1423 /*
1424         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1425         Does NOT destroy the gate as it's a common gate for ALL route tables.
1426 */
1427 static void uta_rt_drop( route_table_t* rt ) {
1428         if( rt == NULL ) {
1429                 return;
1430         }
1431
1432         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1433         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1434         free( rt );
1435 }
1436
1437 /*
1438         Look up and return the pointer to the endpoint stuct matching the given name.
1439         If not in the hash, a new endpoint is created, added to the hash. Should always
1440         return a pointer.
1441 */
1442 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1443         endpoint_t*     ep;
1444
1445         if( !rt || !ep_name || ! *ep_name ) {
1446                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1447                 errno = EINVAL;
1448                 return NULL;
1449         }
1450
1451         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1452                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1453                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1454                         errno = ENOMEM;
1455                         return NULL;
1456                 }
1457
1458                 ep->notify = 1;                                                         // show notification on first connection failure
1459                 ep->open = 0;                                                           // not connected
1460                 ep->addr = uta_h2ip( ep_name );
1461                 ep->name = strdup( ep_name );
1462                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1463                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1464
1465                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1466         }
1467
1468         return ep;
1469 }
1470
1471
1472 /*
1473         Given a session id and message type build a key that can be used to look up the rte in the route
1474         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1475 */
1476 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1477         uint64_t key;
1478
1479         if( sub_id == UNSET_SUBID ) {
1480                 key = 0xffffffff00000000 | mtype;
1481         } else {
1482                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1483         }
1484
1485         return key;
1486 }
1487
1488 /*
1489         Given a route table and meid string, find the owner (if known). Returns a pointer to
1490         the endpoint struct or nil.
1491 */
1492 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1493         endpoint_t const* ep;           // the ep we found in the hash
1494
1495         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1496                 return NULL;
1497         }
1498
1499         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1500 }
1501
1502 /*
1503         This returns a pointer to the currently active route table and ups
1504         the reference count so that the route table is not freed while it
1505         is being used. The caller MUST call release_rt() when finished
1506         with the pointer.
1507
1508         Care must be taken: the ctx->rtable pointer _could_ change during the time
1509         between the release of the lock and the return. Therefore we MUST grab
1510         the current pointer when we have the lock so that if it does we don't
1511         return a pointer to the wrong table.
1512
1513         This will return NULL if there is no active table.
1514 */
1515 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1516         route_table_t*  rrt;                    // return value
1517
1518         if( ctx == NULL || ctx->rtable == NULL ) {
1519                 return NULL;
1520         }
1521
1522         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1523         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1524         rrt->ref_count++;
1525         pthread_mutex_unlock( ctx->rtgate );
1526
1527         return rrt;                                                                             // pointer we upped the count with
1528 }
1529
1530 /*
1531         This will "release" the route table by reducing the use counter
1532         in the table. The table may not be freed until the counter reaches
1533         0, so it's imparative that the pointer be "released" when it is
1534         fetched by get_rt().  Once the caller has released the table it
1535         may not safely use the pointer that it had.
1536 */
1537 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1538         if( ctx == NULL || rt == NULL ) {
1539                 return;
1540         }
1541
1542         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1543         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1544                 rt->ref_count--;
1545         }
1546         pthread_mutex_unlock( ctx->rtgate );
1547 }
1548 #endif