1d8d9ca244c18efd9cd43b102f45279fd94e9123
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int nalloc;
61         int nused;
62         void** things;
63         const char** names;
64 } thing_list_t;
65
66 // ---- debugging/testing -------------------------------------------------------------------------
67
68 /*
69         Dump some stats for an endpoint in the RT. This is generally called to
70         verify endpoints after a table load/change.
71
72         This is called by the for-each mechanism of the symtab and the prototype is
73         fixe; we don't really use some of the parms, but have dummy references to
74         keep sonar from complaining.
75 */
76 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
77         int*    counter;
78         endpoint_t* ep;
79
80         if( (ep = (endpoint_t *) thing) == NULL ) {
81                 return;
82         }
83
84         if( (counter = (int *) vcounter) != NULL ) {
85                 (*counter)++;
86         } else {
87                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
88         }
89
90         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
91 }
92
93 /*
94         Called to count meid entries in the table. The meid points to an 'owning' endpoint
95         so we can list what we find
96
97         See note in ep_stats about dummy refs.
98 */
99 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
100         int*    counter;
101         endpoint_t* ep;
102
103         if( (ep = (endpoint_t *) thing) == NULL ) {
104                 return;
105         }
106
107         if( (counter = (int *) vcounter) != NULL ) {
108                 (*counter)++;
109         } else {
110                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
111         }
112
113         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
114 }
115
116 /*
117         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
118         the 'source' information and is added to each message.
119
120         See note above about dummy references.
121 */
122 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
123         endpoint_t* ep;
124         char*   id;
125
126         if( (ep = (endpoint_t *) thing) == NULL ) {
127                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
128                 return;
129         }
130
131         if( (id = (char *) vid) == NULL ) {
132                 id = "missing";
133         }
134
135         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
136                 (long long) time( NULL ),
137                 id,
138                 ep->name,
139                 ep->open,
140                 ep->scounts[EPSC_GOOD],
141                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
142                 ep->scounts[EPSC_FAIL],
143                 ep->scounts[EPSC_TRANS]   );
144 }
145
146 /*
147         Dump stats for a route entry in the table.
148 */
149 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
150         int*    counter;
151         rtable_ent_t const* rte;                // thing is really an rte
152         int             mtype;
153         int             sid;
154
155         if( (rte = (rtable_ent_t *) thing) == NULL ) {
156                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
157                 return;
158         }
159
160         if( (counter = (int *) vcounter) != NULL ) {
161                 (*counter)++;
162         }
163
164         mtype = rte->key & 0xffff;
165         sid = (int) (rte->key >> 32);
166
167         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
168 }
169
170 /*
171         Given a route table, cause some stats to be spit out.
172 */
173 static void  rt_stats( route_table_t* rt ) {
174         int* counter;
175
176         if( rt == NULL ) {
177                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
178                 return;
179         }
180
181         counter = (int *) malloc( sizeof( int ) );
182         *counter = 0;
183         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
184         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
185         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
186         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
187
188         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
189         *counter = 0;
190         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
191         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
192
193         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
194         *counter = 0;
195         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
196         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
197
198         free( counter );
199 }
200
201 /*
202         Given a route table, cause endpoint counters to be written to stderr. The id
203         parm is written as the "source" in the output.
204 */
205 static void  rt_epcounts( route_table_t* rt, char* id ) {
206         if( rt == NULL ) {
207                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
208                 return;
209         }
210
211         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
212 }
213
214
215 static void dump_tables( uta_ctx_t *ctx ) {
216         if( ctx->old_rtable != NULL ) {
217                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
218                 rt_stats( ctx->old_rtable );
219         } else {
220                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
221         }
222         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
223         rt_stats( ctx->rtable );
224 }
225
226 // ------------ route manager communication -------------------------------------------------
227 /*
228         Send a request for a table update to the route manager. Updates come in
229         async, so send and go.
230
231         pctx is the private context for the thread; ctx is the application context
232         that we need to be able to send the application ID in case rt mgr needs to
233         use it to idenfity us.
234
235         Returns 0 if we were not able to send a request.
236 */
237 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
238         rmr_mbuf_t*     smsg;
239         int     state = 0;
240
241         if( ctx->rtg_whid < 0 ) {
242                 return state;
243         }
244
245         smsg = rmr_alloc_msg( pctx, 1024 );
246         if( smsg != NULL ) {
247                 smsg->mtype = RMRRM_REQ_TABLE;
248                 smsg->sub_id = 0;
249                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
250                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
251                 smsg->len = strlen( smsg->payload ) + 1;
252
253                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
254                 if( (state = smsg->state) != RMR_OK ) {
255                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
256                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
257                         ctx->rtg_whid = -1;
258                 }
259
260                 rmr_free_msg( smsg );
261         }
262
263         return state;
264 }
265
266 /*
267         Send an ack to the route table manager for a table ID that we are
268         processing.      State is 1 for OK, and 0 for failed. Reason might
269         be populated if we know why there was a failure.
270
271         Context should be the PRIVATE context that we use for messages
272         to route manger and NOT the user's context.
273
274         If a message buffere is passed we use that and use return to sender
275         assuming that this might be a response to a call and that is needed
276         to send back to the proper calling thread. If msg is nil, we allocate
277         and use it.
278 */
279 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
280         int             use_rts = 1;
281         int             payload_size = 1024;
282
283         if( ctx == NULL || ctx->rtg_whid < 0 ) {
284                 return;
285         }
286
287         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
288                 return;
289         }
290
291         if( smsg != NULL ) {
292                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
293         } else {
294                 use_rts = 0;
295                 smsg = rmr_alloc_msg( ctx, payload_size );
296         }
297
298         if( smsg != NULL ) {
299                 smsg->mtype = RMRRM_TABLE_STATE;
300                 smsg->sub_id = -1;
301                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
302                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
303
304                 smsg->len = strlen( smsg->payload ) + 1;
305
306                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
307                 if( use_rts ) {
308                         smsg = rmr_rts_msg( ctx, smsg );
309                 } else {
310                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
311                 }
312                 if( (state = smsg->state) != RMR_OK ) {
313                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
314                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
315                         ctx->rtg_whid = -1;
316                 }
317
318                 if( ! use_rts ) {
319                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
320                 }
321         }
322 }
323
324 // ---- utility -----------------------------------------------------------------------------------
325 /*
326         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
327         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
328 */
329 static char* clip( char* buf ) {
330         char*   tok;
331
332         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
333                 buf++;
334         }
335
336         if( (tok = strchr( buf, '#' )) != NULL ) {
337                 if( tok == buf ) {
338                         return buf;                                     // just push back; leading comment sym handled there
339                 }
340
341                 if( isspace( *(tok-1) ) ) {
342                         *tok = 0;
343                 }
344         }
345
346         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
347         *(tok+1) = 0;
348
349         return buf;
350 }
351
352 /*
353         This accepts a pointer to a nil terminated string, and ensures that there is a
354         newline as the last character. If there is not, a new buffer is allocated and
355         the newline is added.  If a new buffer is allocated, the buffer passed in is
356         freed.  The function returns a pointer which the caller should use, and must
357         free.  In the event of an error, a nil pointer is returned.
358 */
359 static char* ensure_nlterm( char* buf ) {
360         char*   nb = NULL;
361         int             len = 0;
362
363         if( buf != NULL ) {
364                 len = strlen( buf );
365         }
366
367         nb = buf;                                                       // default to returning original as is
368         switch( len ) {
369                 case 0:
370                         nb = strdup( "\n" );
371                         break;
372
373                 case 1:
374                         if( *buf != '\n' ) {            // not a newline; realloc
375                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
376                                 nb = strdup( " \n" );
377                                 *nb = *buf;
378                                 free( buf );
379                         }
380                         break;
381
382                 default:
383                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
384                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
385                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
386                                         memcpy( nb, buf, len );
387                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
388                                         *(nb+len+1) = 0;
389                                         free( buf );
390                                 }
391                         }
392                         break;
393         }
394
395         return nb;
396 }
397
398 /*
399         Roll the new table into the active and the active into the old table. We
400         must have the lock on the active table to do this. It's possible that there
401         is no active table (first load), so we have to account for that (no locking).
402 */
403 static void roll_tables( uta_ctx_t* ctx ) {
404
405         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
406                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
407                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
408                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
409                 pthread_mutex_unlock( ctx->rtgate );
410         } else {
411                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
412                 ctx->rtable = ctx->new_rtable;                          // make new the active one
413         }
414
415         ctx->new_rtable = NULL;
416 }
417
418 // ------------ entry update functions ---------------------------------------------------------------
419 /*
420         Given a message type create a route table entry and add to the hash keyed on the
421         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
422         is the number of group slots to allocate in the entry.
423 */
424 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
425         rtable_ent_t* rte;
426         rtable_ent_t* old_rte;          // entry which was already in the table for the key
427
428         if( rt == NULL ) {
429                 return NULL;
430         }
431
432         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
433                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
434                 return NULL;
435         }
436         memset( rte, 0, sizeof( *rte ) );
437         rte->refs = 1;
438         rte->key = key;
439
440         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
441                 nrrgroups = 10;
442         }
443
444         if( nrrgroups ) {
445                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
446                         free( rte );
447                         return NULL;
448                 }
449                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
450         } else {
451                 rte->rrgroups = NULL;
452         }
453
454         rte->nrrgroups = nrrgroups;
455
456         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
457                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
458         }
459
460         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
461
462         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
463         return rte;
464 }
465
466 /*
467         This accepts partially parsed information from an rte or mse record sent by route manager or read from
468         a file such that:
469                 ts_field is the msg-type,sender field
470                 subid is the integer subscription id
471                 rr_field is the endpoint information for round robening message over
472
473         If all goes well, this will add an RTE to the table under construction.
474
475         The ts_field is checked to see if we should ingest this record. We ingest if one of
476         these is true:
477                 there is no sender info (a generic entry for all)
478                 there is sender and our host:port matches one of the senders
479                 the sender info is an IP address that matches one of our IP addresses
480 */
481 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
482         rtable_ent_t*   rte;            // route table entry added
483         char const*     tok;
484         int             ntoks;
485         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
486         char*   tokens[128];
487         char*   gtokens[64];
488         int             i;
489         int             ngtoks;                         // number of tokens in the group list
490         int             grp;                            // index into group list
491         int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
492         int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
493
494         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
495         rr_field = clip( rr_field );
496
497         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
498                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
499                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
500
501                 key = build_rt_key( subid, atoi( ts_field ) );
502
503                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
504
505                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
506                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
507                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
508                         }
509                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
510                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
511
512                         for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
513                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
514                                         for( i = 0; i < ntoks; i++ ) {
515                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
516                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
517                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
518                                                         has_ep = TRUE;
519                                                 }
520                                         }
521                                         if( has_ep ) {
522                                                 cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
523                                                 has_ep = FALSE;
524                                         }
525                                 }
526                         }
527                 }
528         } else {
529                 if( DEBUG || (vlevel > 2) ) {
530                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
531                 }
532         }
533 }
534
535 /*
536         Trash_entry takes a partially parsed record from the input and
537         will delete the entry if the sender,mtype matches us or it's a
538         generic mtype. The refernce in the new table is removed and the
539         refcounter for the actual rte is decreased. If that ref count is
540         0 then the memory is freed (handled byh the del_rte call).
541 */
542 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
543         rtable_ent_t*   rte;            // route table entry to be 'deleted'
544         char const*     tok;
545         int             ntoks;
546         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
547         char*   tokens[128];
548
549         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
550                 return;
551         }
552
553         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
554
555         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
556                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
557                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
558
559                 key = build_rt_key( subid, atoi( ts_field ) );
560                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
561                 if( rte != NULL ) {
562                         if( DEBUG || (vlevel > 1) ) {
563                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
564                         }
565                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
566                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
567                 } else {
568                         if( DEBUG || (vlevel > 1) ) {
569                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
570                         }
571                 }
572         } else {
573                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
574         }
575 }
576
577 // -------------------------- parse functions --------------------------------------------------
578
579 /*
580         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
581         the 'owner' which should be the dns name or IP address of an enpoint
582         the meid_list is a space separated list of me IDs
583
584         This function assumes the caller has vetted the pointers as needed.
585
586         For each meid in the list, an entry is pushed into the hash which references the owner
587         endpoint such that when the meid is used to route a message it references the endpoint
588         to send messages to.
589 */
590 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
591         char const*     tok;
592         int             ntoks;
593         char*   tokens[128];
594         int             i;
595         int             state;
596         endpoint_t*     ep;                                             // endpoint struct for the owner
597
598         owner = clip( owner );                          // ditch extra whitespace and trailing comments
599         meid_list = clip( meid_list );
600
601         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
602         for( i = 0; i < ntoks; i++ ) {
603                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
604                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
605                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
606                 } else {
607                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
608                 }
609         }
610 }
611
612 /*
613         Given the tokens from an mme_del, delete the listed meid entries from the new
614         table. The list is a space separated list of meids.
615
616         The meids in the hash reference endpoints which are never deleted and so
617         the only thing that we need to do here is to remove the meid from the hash.
618
619         This function assumes the caller has vetted the pointers as needed.
620 */
621 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
622         char const*     tok;
623         int             ntoks;
624         char*   tokens[128];
625         int             i;
626
627         if( rtab->hash == NULL ) {
628                 return;
629         }
630
631         meid_list = clip( meid_list );
632
633         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
634         for( i = 0; i < ntoks; i++ ) {
635                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
636                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
637         }
638 }
639
640 /*
641         Parse a partially parsed meid record. Tokens[0] should be one of:
642                 meid_map, mme_ar, mme_del.
643
644         pctx is the private context needed to return an ack/nack using the provided
645         message buffer with the route managers address info.
646 */
647 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
648         char wbuf[1024];
649
650         if( tokens == NULL || ntoks < 1 ) {
651                 return;                                                 // silent but should never happen
652         }
653
654         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
655                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
656                 return;
657         }
658
659         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
660                 tokens[1] = clip( tokens[1] );
661                 if( *(tokens[1]) == 's' ) {
662                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
663                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
664                                 uta_rt_drop( ctx->new_rtable );
665                                 ctx->new_rtable = NULL;
666                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
667                         }
668
669                         if( ctx->table_id != NULL ) {
670                                 free( ctx->table_id );
671                         }
672                         if( ntoks > 2 ) {
673                                 ctx->table_id = strdup( clip( tokens[2] ) );
674                         } else {
675                                 ctx->table_id = NULL;
676                         }
677
678                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
679                         ctx->new_rtable->mupdates = 0;
680
681                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
682                 } else {
683                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
684                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
685                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
686                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
687                                                                 ctx->new_rtable->mupdates, tokens[2] );
688                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
689                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
690                                                 uta_rt_drop( ctx->new_rtable );
691                                                 ctx->new_rtable = NULL;
692                                                 return;
693                                         }
694
695                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
696                                 }
697
698                                 if( ctx->new_rtable ) {
699                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
700                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
701                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
702
703                                         if( vlevel > 0 ) {
704                                                 if( ctx->old_rtable != NULL ) {
705                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
706                                                         rt_stats( ctx->old_rtable );
707                                                 } else {
708                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
709                                                 }
710                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
711                                                 rt_stats( ctx->rtable );
712                                         }
713                                 } else {
714                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
715                                         ctx->new_rtable = NULL;
716                                 }
717                         }
718                 }
719
720                 return;
721         }
722
723         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
724                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
725                 return;
726         }
727
728         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
729                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
730                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
731                         return;
732                 }
733                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
734                 ctx->new_rtable->mupdates++;
735                 return;
736         }
737
738         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
739                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
740                 ctx->new_rtable->mupdates++;
741                 return;
742         }
743 }
744
745 /*
746         Parse a single record recevied from the route table generator, or read
747         from a static route table file.  Start records cause a new table to
748         be started (if a partial table was received it is discarded. Table
749         entry records are added to the currenly 'in progress' table, and an
750         end record causes the in progress table to be finalised and the
751         currently active table is replaced.
752
753         The updated table will be activated when the *|end record is encountered.
754         However, to allow for a "double" update, where both the meid map and the
755         route table must be updated at the same time, the end indication on a
756         route table (new or update) may specifiy "hold" which indicates that meid
757         map entries are to follow and the updated route table should be held as
758         pending until the end of the meid map is received and validated.
759
760         CAUTION:  we are assuming that there is a single route/meid map generator
761                 and as such only one type of update is received at a time; in other
762                 words, the sender cannot mix update records and if there is more than
763                 one sender process they must synchronise to avoid issues.
764
765
766         For a RT update, we expect:
767                 newrt | start | <table-id>
768                 newrt | end | <count>
769                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
770                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
771                 mse| <mtype>[,sender] | <sub-id> | %meid
772
773
774         For a meid map update we expect:
775                 meid_map | start | <table-id>
776                 meid_map | end | <count> | <md5-hash>
777                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
778                 mme_del | <meid0> <meid1>...<meidn>
779
780
781         The pctx is our private context that must be used to send acks/status
782         messages back to the route manager.  The regular ctx is the ctx that
783         the user has been given and thus that's where we have to hang the route
784         table we're working with.
785
786         If mbuf is given, and we need to ack, then we ack using the mbuf and a
787         return to sender call (allows route manager to use wh_call() to send
788         an update and rts is required to get that back to the right thread).
789         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
790         will be used.
791 */
792 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
793         int i;
794         int ntoks;                                                      // number of tokens found in something
795         int ngtoks;
796         int     grp;                                                    // group number
797         rtable_ent_t const*     rte;                    // route table entry added
798         char*   tokens[128];
799         char*   tok;                                            // pointer into a token or string
800         char    wbuf[1024];
801
802         if( ! buf ) {
803                 return;
804         }
805
806         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
807                 buf++;
808         }
809         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
810         *(tok+1) = 0;
811
812         memset( tokens, 0, sizeof( tokens ) );
813         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
814                 tokens[0] = clip( tokens[0] );
815                 switch( *(tokens[0]) ) {
816                         case 0:                                                                                                 // ignore blanks
817                                 // fallthrough
818                         case '#':                                                                                               // and comment lines
819                                 break;
820
821                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
822                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
823                                         break;
824                                 }
825
826                                 if( ntoks < 3 ) {
827                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
828                                         break;
829                                 }
830
831                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
832                                 ctx->new_rtable->updates++;
833                                 break;
834
835                         case 'n':                                                                                               // newrt|{start|end}
836                                 tokens[1] = clip( tokens[1] );
837                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
838                                         if( ntoks >2 ) {
839                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
840                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
841                                                                 ctx->new_rtable->updates, tokens[2] );
842                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
843                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
844                                                         uta_rt_drop( ctx->new_rtable );
845                                                         ctx->new_rtable = NULL;
846                                                         break;
847                                                 }
848                                         }
849
850                                         if( ctx->new_rtable ) {
851                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
852                                                 if( DEBUG > 1 || (vlevel > 1) ) {
853                                                         rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
854                                                         dump_tables( ctx );
855                                                 }
856
857                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
858                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
859                                         } else {
860                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
861                                                 ctx->new_rtable = NULL;
862                                         }
863                                 } else {                                                                                                                        // start a new table.
864                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
865                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
866
867                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
868                                                 uta_rt_drop( ctx->new_rtable );
869                                                 ctx->new_rtable = NULL;
870                                         }
871
872                                         if( ctx->table_id != NULL ) {
873                                                 free( ctx->table_id );
874                                         }
875                                         if( ntoks >2 ) {
876                                                 ctx->table_id = strdup( clip( tokens[2] ) );
877                                         } else {
878                                                 ctx->table_id = NULL;
879                                         }
880
881                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
882                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
883
884                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
885                                 }
886                                 break;
887
888                         case 'm':                                                                       // mse entry or one of the meid_ records
889                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
890                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
891                                                 break;
892                                         }
893
894                                         if( ntoks < 4 ) {
895                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
896                                                 break;
897                                         }
898
899                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
900                                         ctx->new_rtable->updates++;
901                                 } else {
902                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
903                                 }
904                                 break;
905
906                         case 'r':                                       // assume rt entry
907                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
908                                         break;
909                                 }
910
911                                 ctx->new_rtable->updates++;
912                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
913                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
914                                 } else {
915                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
916                                 }
917                                 break;
918
919                         case 'u':                                                                                               // update current table, not a total replacement
920                                 tokens[1] = clip( tokens[1] );
921                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
922                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
923                                                 break;
924                                         }
925
926                                         if( ntoks >2 ) {
927                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
928                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
929                                                                 ctx->new_rtable->updates, tokens[2] );
930                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
931                                                         uta_rt_drop( ctx->new_rtable );
932                                                         ctx->new_rtable = NULL;
933                                                         break;
934                                                 }
935                                         }
936
937                                         if( ctx->new_rtable ) {
938                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
939                                                 if( DEBUG > 1 || (vlevel > 1) )  {
940                                                         rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
941                                                         dump_tables( ctx );
942                                                 }
943
944                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
945                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
946                                         } else {
947                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
948                                                 ctx->new_rtable = NULL;
949                                         }
950                                 } else {                                                                                        // start a new table.
951                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
952                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
953                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
954                                                 uta_rt_drop( ctx->new_rtable );
955                                                 ctx->new_rtable = NULL;
956                                         }
957
958                                         if( ntoks > 2 ) {
959                                                 if( ctx->table_id != NULL ) {
960                                                         free( ctx->table_id );
961                                                 }
962                                                 ctx->table_id = strdup( clip( tokens[2] ) );
963                                         }
964
965                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
966                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
967
968                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
969                                 }
970                                 break;
971
972                         default:
973                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
974                                 break;
975                 }
976         }
977 }
978
979 /*
980         This function attempts to open a static route table in order to create a 'seed'
981         table during initialisation.  The environment variable RMR_SEED_RT is expected
982         to contain the necessary path to the file. If missing, or if the file is empty,
983         no route table will be available until one is received from the generator.
984
985         This function is probably most useful for testing situations, or extreme
986         cases where the routes are static.
987 */
988 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
989         int             i;
990         char*   fname;
991         char*   fbuf;                           // buffer with file contents
992         char*   rec;                            // start of the record
993         char*   eor;                            // end of the record
994         int             rcount = 0;                     // record count for debug
995
996         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
997                 return;
998         }
999
1000         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
1001                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
1002                 return;
1003         }
1004
1005         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
1006         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
1007                 if( *eor == '\r' ) {
1008                         *eor = '\n';                                                            // will look like a blank line which is ok
1009                 }
1010         }
1011
1012         rec = fbuf;
1013         while( rec && *rec ) {
1014                 rcount++;
1015                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1016                         *eor = 0;
1017                 } else {
1018                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1019                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1020                         free( fbuf );
1021                         return;
1022                 }
1023
1024                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1025
1026                 rec = eor+1;
1027         }
1028
1029         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1030         free( fbuf );
1031 }
1032
1033 /*
1034         Callback driven for each named thing in a symtab. We collect the pointers to those
1035         things for later use (cloning).
1036 */
1037 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1038         thing_list_t*   tl;
1039
1040         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1041                 return;
1042         }
1043
1044         if( thing == NULL ) {
1045                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1046                 return;
1047         }
1048
1049         tl->names[tl->nused] = name;                    // the name/key
1050         tl->things[tl->nused++] = thing;                // save a reference to the thing
1051 }
1052
1053 /*
1054         Called to delete a route table entry struct. We delete the array of endpoint
1055         pointers, but NOT the endpoints referenced as those are referenced from
1056         multiple entries.
1057
1058         Route table entries can be concurrently referenced by multiple symtabs, so
1059         the actual delete happens only if decrementing the rte's ref count takes it
1060         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1061         the symtab, or overlaying an entry.
1062
1063         This function uses ONLY the pointer to the rte (thing) and ignores the other
1064         information that symtab foreach function passes (st, entry, and data) which
1065         means that it _can_ safetly be used outside of the foreach setting. If
1066         the function is changed to depend on any of these three, then a stand-alone
1067         rte_cleanup() function should be added and referenced by this, and refererences
1068         to this outside of the foreach world should be changed.
1069 */
1070 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1071         rtable_ent_t*   rte;
1072         int i;
1073
1074         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1075                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1076                 return;
1077         }
1078
1079         rte->refs--;
1080         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1081                 return;
1082         }
1083
1084         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1085                 for( i = 0; i < rte->nrrgroups; i++ ) {
1086                         if( rte->rrgroups[i] ) {
1087                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1088                                 free( rte->rrgroups[i] );                               // but must free the rrg itself too
1089                         }
1090
1091                 }
1092
1093                 free( rte->rrgroups );
1094         }
1095
1096         free( rte );                                                                                    // finally, drop the potato
1097 }
1098
1099 /*
1100         Read an entire file into a buffer. We assume for route table files
1101         they will be smallish and so this won't be a problem.
1102         Returns a pointer to the buffer, or nil. Caller must free.
1103         Terminates the buffer with a nil character for string processing.
1104
1105         If we cannot stat the file, we assume it's empty or missing and return
1106         an empty buffer, as opposed to a nil, so the caller can generate defaults
1107         or error if an empty/missing file isn't tolerated.
1108 */
1109 static char* uta_fib( char const* fname ) {
1110         struct stat     stats;
1111         off_t           fsize = 8192;   // size of the file
1112         off_t           nread;                  // number of bytes read
1113         int                     fd;
1114         char*           buf;                    // input buffer
1115
1116         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1117                 if( fstat( fd, &stats ) >= 0 ) {
1118                         if( stats.st_size <= 0 ) {                                      // empty file
1119                                 close( fd );
1120                                 fd = -1;
1121                         } else {
1122                                 fsize = stats.st_size;                                          // stat ok, save the file size
1123                         }
1124                 } else {
1125                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1126                 }
1127         }
1128
1129         if( fd < 0 ) {                                                                                  // didn't open or empty
1130                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1131                         return NULL;
1132                 }
1133
1134                 *buf = 0;
1135                 return buf;
1136         }
1137
1138         // add a size limit check here
1139
1140         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1141                 close( fd );
1142                 errno = ENOMEM;
1143                 return NULL;
1144         }
1145
1146         nread = read( fd, buf, fsize );
1147         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1148                 free( buf );
1149                 errno = EFBIG;                                                                                  // likely too much to handle
1150                 close( fd );
1151                 return NULL;
1152         }
1153
1154         buf[nread] = 0;
1155
1156         close( fd );
1157         return buf;
1158 }
1159
1160 // --------------------- initialisation/creation ---------------------------------------------
1161 /*
1162         Create and initialise a route table; Returns a pointer to the table struct.
1163 */
1164 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1165         route_table_t*  rt;
1166
1167         if( ctx == NULL ) {
1168                 return NULL;
1169         }
1170         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1171                 return NULL;
1172         }
1173
1174         memset( rt, 0, sizeof( *rt ) );
1175
1176         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1177                 free( rt );
1178                 return NULL;
1179         }
1180
1181         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1182         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1183         pthread_mutex_init( rt->gate, NULL );
1184
1185         return rt;
1186 }
1187
1188 /*
1189         Clones one of the spaces in the given table.
1190         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1191         Space is the space in the old table to copy. Space 0 uses an integer key and
1192         references rte structs. All other spaces use a string key and reference endpoints.
1193 */
1194 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1195         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1196         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1197         void*   sst;                    // source symtab
1198         void*   nst;                    // new symtab
1199         thing_list_t things;    // things from the space to copy
1200         int             i;
1201         int             free_on_err = 0;
1202
1203         if( ctx == NULL ) {
1204                 return NULL;
1205         }
1206         if( nrt == NULL ) {                             // make a new table if needed
1207                 free_on_err = 1;
1208                 nrt = uta_rt_init( ctx );
1209                 if( nrt == NULL ) {
1210                         return NULL;
1211                 }
1212         }
1213
1214         if( srt == NULL ) {             // source was nil, just give back the new table
1215                 return nrt;
1216         }
1217
1218         things.nalloc = 2048;
1219         things.nused = 0;
1220         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1221         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1222         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1223         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1224         if( things.things == NULL ) {
1225                 if( free_on_err ) {
1226                         rmr_sym_free( nrt->hash );
1227                         free( nrt );
1228                         nrt = NULL;
1229                 }
1230
1231                 return nrt;
1232         }
1233
1234         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1235         nst = nrt->hash;
1236
1237         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1238
1239         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1240         for( i = 0; i < things.nused; i++ ) {
1241                 if( space ) {                                                                                           // string key, epoint reference
1242                         ep = (endpoint_t *) things.things[i];
1243                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1244                 } else {
1245                         rte = (rtable_ent_t *) things.things[i];
1246                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1247                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1248                 }
1249         }
1250
1251         free( things.things );
1252         free( (void *) things.names );
1253         return nrt;
1254 }
1255
1256 /*
1257         Given a destination route table (drt), clone from the source (srt) into it.
1258         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1259         allocate the drt if that was nil too). If all is true (1), then we will clone both
1260         the MT and the ME spaces; otherwise only the ME space is cloned.
1261 */
1262 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1263         endpoint_t*             ep;                             // an endpoint
1264         rtable_ent_t*   rte;                    // a route table entry
1265         int i;
1266
1267         if( ctx == NULL ) {
1268                 return NULL;
1269         }
1270         if( drt == NULL ) {
1271                 drt = uta_rt_init( ctx );
1272         }
1273         if( srt == NULL ) {
1274                 return drt;
1275         }
1276
1277         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1278         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1279         if( all ) {
1280                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1281         }
1282
1283         return drt;
1284 }
1285
1286 /*
1287         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1288         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1289         context to be referenced by the new pointer; the old pointer is set to nil.
1290
1291         If the old table doesn't exist, then a new table is created and the new pointer is
1292         set to reference it.
1293 */
1294 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1295         int counter = 0;
1296         route_table_t*  rt;
1297
1298         if( ctx == NULL ) {
1299                 return NULL;
1300         }
1301
1302         if( (rt = ctx->old_rtable) != NULL ) {
1303                 ctx->old_rtable = NULL;
1304                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1305                         if( counter++ > 1000 ) {
1306                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1307                                 break;
1308                         }
1309
1310                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1311                 }
1312
1313                 if( rt->hash != NULL ) {
1314                         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
1315                         rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
1316                 }
1317         } else {
1318                 rt = NULL;
1319         }
1320
1321         rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
1322         rt->ref_count = 0;                                                                      // take no chances; ensure it's 0!
1323
1324         return rt;
1325 }
1326
1327
1328 /*
1329         Given a name, find the endpoint struct in the provided route table.
1330 */
1331 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1332
1333         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1334                 return NULL;
1335         }
1336
1337         return rmr_sym_get( rt->ephash, ep_name, 1 );
1338 }
1339
1340 /*
1341         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1342         Does NOT destroy the gate as it's a common gate for ALL route tables.
1343 */
1344 static void uta_rt_drop( route_table_t* rt ) {
1345         if( rt == NULL ) {
1346                 return;
1347         }
1348
1349         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1350         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1351         free( rt );
1352 }
1353
1354 /*
1355         Look up and return the pointer to the endpoint stuct matching the given name.
1356         If not in the hash, a new endpoint is created, added to the hash. Should always
1357         return a pointer.
1358 */
1359 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1360         endpoint_t*     ep;
1361
1362         if( !rt || !ep_name || ! *ep_name ) {
1363                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1364                 errno = EINVAL;
1365                 return NULL;
1366         }
1367
1368         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1369                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1370                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1371                         errno = ENOMEM;
1372                         return NULL;
1373                 }
1374
1375                 ep->notify = 1;                                                         // show notification on first connection failure
1376                 ep->open = 0;                                                           // not connected
1377                 ep->addr = uta_h2ip( ep_name );
1378                 ep->name = strdup( ep_name );
1379                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1380                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1381
1382                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1383         }
1384
1385         return ep;
1386 }
1387
1388
1389 /*
1390         Given a session id and message type build a key that can be used to look up the rte in the route
1391         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1392 */
1393 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1394         uint64_t key;
1395
1396         if( sub_id == UNSET_SUBID ) {
1397                 key = 0xffffffff00000000 | mtype;
1398         } else {
1399                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1400         }
1401
1402         return key;
1403 }
1404
1405 /*
1406         Given a route table and meid string, find the owner (if known). Returns a pointer to
1407         the endpoint struct or nil.
1408 */
1409 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1410         endpoint_t const* ep;           // the ep we found in the hash
1411
1412         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1413                 return NULL;
1414         }
1415
1416         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1417 }
1418
1419 /*
1420         This returns a pointer to the currently active route table and ups
1421         the reference count so that the route table is not freed while it
1422         is being used. The caller MUST call release_rt() when finished
1423         with the pointer.
1424
1425         Care must be taken: the ctx->rtable pointer _could_ change during the time
1426         between the release of the lock and the return. Therefore we MUST grab
1427         the current pointer when we have the lock so that if it does we don't
1428         return a pointer to the wrong table.
1429
1430         This will return NULL if there is no active table.
1431 */
1432 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1433         route_table_t*  rrt;                    // return value
1434
1435         if( ctx == NULL || ctx->rtable == NULL ) {
1436                 return NULL;
1437         }
1438
1439         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1440         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1441         rrt->ref_count++;
1442         pthread_mutex_unlock( ctx->rtgate );
1443
1444         return rrt;                                                                             // pointer we upped the count with
1445 }
1446
1447 /*
1448         This will "release" the route table by reducing the use counter
1449         in the table. The table may not be freed until the counter reaches
1450         0, so it's imparative that the pointer be "released" when it is
1451         fetched by get_rt().  Once the caller has released the table it
1452         may not safely use the pointer that it had.
1453 */
1454 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1455         if( ctx == NULL || rt == NULL ) {
1456                 return;
1457         }
1458
1459         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1460         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1461                 rt->ref_count--;
1462         }
1463         pthread_mutex_unlock( ctx->rtgate );
1464 }
1465 #endif