9b0110052ac0125663a9f9eda9969b41674a596e
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58 } thing_list_t;
59
60 // ---- debugging/testing -------------------------------------------------------------------------
61
62 /*
63         Dump stats for an endpoint in the RT.
64 */
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
66         int*    counter;
67         endpoint_t* ep;
68
69         if( (ep = (endpoint_t *) thing) == NULL ) {
70                 return;
71         }
72
73         if( (counter = (int *) vcounter) != NULL ) {
74                 (*counter)++;
75         }
76
77         fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
78 }
79
80 /*
81         Dump stats for a route entry in the table.
82 */
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
84         int*    counter;
85         rtable_ent_t* rte;                      // thing is really an rte
86         int             mtype;
87         int             sid;
88
89         if( (rte = (rtable_ent_t *) thing) == NULL ) {
90                 return;
91         }
92
93         if( (counter = (int *) vcounter) != NULL ) {
94                 (*counter)++;
95         }
96
97         mtype = rte->key & 0xffff;
98         sid = (int) (rte->key >> 32);
99
100         fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
101 }
102
103 /*
104         Given a route table, cause some stats to be spit out.
105 */
106 static void  rt_stats( route_table_t* rt ) {
107         int* counter;
108
109         if( rt == NULL ) {
110                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
111                 return;
112         }
113
114         counter = (int *) malloc( sizeof( int ) );
115         *counter = 0;
116         fprintf( stderr, "[DBUG] rtstats:\n" );
117         rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
118         fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
119
120         *counter = 0;
121         rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
122         fprintf( stderr, "[DBUG] %d entries\n", *counter );
123
124         free( counter );
125 }
126
127
128 // ------------------------------------------------------------------------------------------------
129 /*
130         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
132 */
133 static char* clip( char* buf ) {
134         char*   tok;
135
136         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
137                 buf++;
138         }
139
140         if( (tok = strchr( buf, '#' )) != NULL ) {
141                 if( tok == buf ) {
142                         return buf;                                     // just push back; leading comment sym handled there
143                 }
144
145                 if( isspace( *(tok-1) ) ) {
146                         *tok = 0;
147                 }
148         }
149
150         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
151         *(tok+1) = 0;
152
153         return buf;
154 }
155
156 /*
157         This accepts a pointer to a nil terminated string, and ensures that there is a
158         newline as the last character. If there is not, a new buffer is allocated and
159         the newline is added.  If a new buffer is allocated, the buffer passed in is
160         freed.  The function returns a pointer which the caller should use, and must
161         free.  In the event of an error, a nil pointer is returned.
162 */
163 static char* ensure_nlterm( char* buf ) {
164         char*   nb = NULL;
165         int             len = 1;
166         
167
168         nb = buf;
169         if( buf == NULL || (len = strlen( buf )) < 2 ) {
170                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
171                         *nb = '\n';
172                         *(nb+1) = 0;
173                 }
174         } else {
175                 if( buf[len-1] != '\n' ) {
176                         fprintf( stderr, "[WARN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
177                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
178                                 memcpy( nb, buf, len );
179                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
180                                 *(nb+len+1) = 0;
181                                 free( buf );
182                         }       
183                 }
184         }
185
186         return nb;
187 }
188
189 /*
190         Given a message type create a route table entry and add to the hash keyed on the
191         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
192         is the number of group slots to allocate in the entry.
193 */
194 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
195         rtable_ent_t* rte;
196         rtable_ent_t* old_rte;          // entry which was already in the table for the key
197
198         if( rt == NULL ) {
199                 return NULL;
200         }
201
202         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
203                 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
204                 return NULL;
205         }
206         memset( rte, 0, sizeof( *rte ) );
207         rte->refs = 1;
208         rte->key = key;
209
210         if( nrrgroups <= 0 ) {
211                 nrrgroups = 10;
212         }
213
214         if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
215                 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
216                 free( rte );
217                 return NULL;
218         }
219         memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
220         rte->nrrgroups = nrrgroups;
221
222         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
223                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
224         }
225
226         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
227
228         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
229         return rte;
230 }
231
232 /*
233         This accepts partially parsed information from a record sent by route manager or read from
234         a file such that:
235                 ts_field is the msg-type,sender field
236                 subid is the integer subscription id
237                 rr_field is the endpoint information for round robening message over
238
239         If all goes well, this will add an RTE to the table under construction.
240
241         The ts_field is checked to see if we should ingest this record. We ingest if one of
242         these is true:
243                 there is no sender info (a generic entry for all)
244                 there is sender and our host:port matches one of the senders
245                 the sender info is an IP address that matches one of our IP addresses
246 */
247 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
248         rtable_ent_t*   rte;            // route table entry added
249         char*   tok;
250         int             ntoks;
251         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
252         char*   tokens[128];
253         char*   gtokens[64];
254         int             i;
255         int             ngtoks;                         // number of tokens in the group list
256         int             grp;                            // index into group list
257
258         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
259         rr_field = clip( rr_field );
260
261         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
262                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
263                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
264
265                         key = build_rt_key( subid, atoi( ts_field ) );
266
267                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
268
269                         if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
270                                 rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
271
272                                 for( grp = 0; grp < ngtoks; grp++ ) {
273                                         if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
274                                                 for( i = 0; i < ntoks; i++ ) {
275                                                         if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
276                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
277                                                 }
278                                         }
279                                 }
280                         }
281                 } else {
282                         if( DEBUG || (vlevel > 2) )
283                                 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
284                 }
285 }
286
287 /*
288         Trash_entry takes a partially parsed record from the input and
289         will delete the entry if the sender,mtype matches us or it's a
290         generic mtype. The refernce in the new table is removed and the
291         refcounter for the actual rte is decreased. If that ref count is
292         0 then the memory is freed (handled byh the del_rte call).
293 */
294 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
295         rtable_ent_t*   rte;            // route table entry to be 'deleted'
296         char*   tok;
297         int             ntoks;
298         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
299         char*   tokens[128];
300
301         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
302                 return;
303         }
304
305         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
306
307         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
308                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
309                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
310
311                 key = build_rt_key( subid, atoi( ts_field ) );
312                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
313                 if( rte != NULL ) {
314                         if( DEBUG || (vlevel > 1) ) {
315                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
316                         }
317                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
318                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
319                 } else {
320                         if( DEBUG || (vlevel > 1) ) {
321                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
322                         }
323                 }
324         } else {
325                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
326         }
327 }
328
329 /*
330         Parse a single record recevied from the route table generator, or read
331         from a static route table file.  Start records cause a new table to
332         be started (if a partial table was received it is discarded. Table
333         entry records are added to the currenly 'in progress' table, and an
334         end record causes the in progress table to be finalised and the
335         currently active table is replaced.
336
337         We expect one of several types:
338                 newrt|{start|end}
339                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
340                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
341 */
342 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
343         int i;
344         int ntoks;                                                      // number of tokens found in something
345         int ngtoks;
346         int     grp;                                                    // group number
347         rtable_ent_t*   rte;                            // route table entry added
348         char*   tokens[128];
349         char*   gtokens[64];                            // groups
350         char*   tok;                                            // pointer into a token or string
351
352         if( ! buf ) {
353                 return;
354         }
355
356         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
357                 buf++;
358         }
359         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
360         *(tok+1) = 0;
361
362         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
363                 switch( *(tokens[0]) ) {
364                         case 0:                                                                                                 // ignore blanks
365                                 // fallthrough
366                         case '#':                                                                                               // and comment lines
367                                 break;
368
369                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
370                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
371                                         break;
372                                 }
373
374                                 if( ntoks < 3 ) {
375                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
376                                         break;
377                                 }
378
379                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
380                                 ctx->new_rtable->updates++;
381                                 break;
382
383                         case 'n':                                                                                               // newrt|{start|end}
384                                 tokens[1] = clip( tokens[1] );
385                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
386                                         if( ctx->new_rtable ) {
387                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
388                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
389                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
390                                                 ctx->new_rtable = NULL;
391                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
392
393                                                 if( vlevel > 0 ) {
394                                                         fprintf( stderr, "[DBUG] old route table:\n" );
395                                                         rt_stats( ctx->old_rtable );
396                                                         fprintf( stderr, "[DBUG] new route table:\n" );
397                                                         rt_stats( ctx->rtable );
398                                                 }
399                                         } else {
400                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
401                                                 ctx->new_rtable = NULL;
402                                         }
403                                 } else {                                                                                        // start a new table.
404                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
405                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
406                                                 uta_rt_drop( ctx->new_rtable );
407                                         }
408
409                                         if( ctx->rtable )  {
410                                                 ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
411                                         } else {
412                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
413                                         }
414                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
415                                 }
416                                 break;
417
418                         case 'm':                                       // assume mse entry
419                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
420                                         break;
421                                 }
422
423                                 if( ntoks < 4 ) {
424                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
425                                         break;
426                                 }
427
428                                 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
429                                 ctx->new_rtable->updates++;
430                                 break;
431
432                         case 'r':                                       // assume rt entry
433                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
434                                         break;
435                                 }
436
437                                 ctx->new_rtable->updates++;
438                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
439                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
440                                 } else {
441                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
442                                 }
443                                 break;
444
445                         case 'u':                                                                                               // update current table, not a total replacement
446                                 tokens[1] = clip( tokens[1] );
447                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
448                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
449                                                 break;
450                                         }
451
452                                         if( ntoks >2 ) {
453                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
454                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
455                                                                 ctx->new_rtable->updates, tokens[2] );
456                                                         uta_rt_drop( ctx->new_rtable );
457                                                         ctx->new_rtable = NULL;
458                                                         break;
459                                                 }
460                                         }
461
462                                         if( ctx->new_rtable ) {
463                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
464                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
465                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
466                                                 ctx->new_rtable = NULL;
467                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
468
469                                                 if( vlevel > 0 ) {
470                                                         fprintf( stderr, "[DBUG] old route table:\n" );
471                                                         rt_stats( ctx->old_rtable );
472                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
473                                                         rt_stats( ctx->rtable );
474                                                 }
475                                         } else {
476                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
477                                                 ctx->new_rtable = NULL;
478                                         }
479                                 } else {                                                                                        // start a new table.
480                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
481                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
482                                                 uta_rt_drop( ctx->new_rtable );
483                                         }
484
485                                         if( ctx->rtable )  {
486                                                 ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
487                                         } else {
488                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
489                                         }
490
491                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
492                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
493                                 }
494                                 break;
495
496                         default:
497                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
498                                 break;
499                 }
500         }
501 }
502
503 /*
504         This function attempts to open a static route table in order to create a 'seed'
505         table during initialisation.  The environment variable RMR_SEED_RT is expected
506         to contain the necessary path to the file. If missing, or if the file is empty,
507         no route table will be available until one is received from the generator.
508
509         This function is probably most useful for testing situations, or extreme
510         cases where the routes are static.
511 */
512 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
513         int             i;
514         char*   fname;
515         char*   fbuf;                           // buffer with file contents
516         char*   rec;                            // start of the record
517         char*   eor;                            // end of the record
518         int             rcount = 0;                     // record count for debug
519
520         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
521                 return;
522         }
523
524         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
525                 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
526                 return;
527         }
528
529         if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
530         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
531                 if( *eor == '\r' ) {
532                         *eor = '\n';                                                            // will look like a blank line which is ok
533                 }
534         }
535
536         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
537                 rcount++;
538                 if( (eor = strchr( rec, '\n' )) != NULL ) {
539                         *eor = 0;
540                 } else {
541                         fprintf( stderr, "[WARN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
542                         fprintf( stderr, "[WARN] rmr read_static: seed route table not used: %s\n", fname );
543                         free( fbuf );
544                         return;
545                 }
546
547                 parse_rt_rec( ctx, rec, vlevel );
548         }
549
550         if( DEBUG ) fprintf( stderr, "[DBUG] rmr:  seed route table successfully parsed: %d records\n", rcount );
551         free( fbuf );
552 }
553
554 /*
555         Callback driven for each named thing in a symtab. We collect the pointers to those
556         things for later use (cloning).
557 */
558 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
559         thing_list_t*   tl;
560
561         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
562                 return;
563         }
564
565         if( thing == NULL ) {
566                 return;
567         }
568
569         tl->things[tl->nused++] = thing;                // save a reference to the thing
570 }
571
572 /*
573         Called to delete a route table entry struct. We delete the array of endpoint
574         pointers, but NOT the endpoints referenced as those are referenced from
575         multiple entries.
576
577         Route table entries can be concurrently referenced by multiple symtabs, so
578         the actual delete happens only if decrementing the rte's ref count takes it
579         to 0. Thus, it is safe to call this function across a symtab when cleaning up
580         the symtab, or overlaying an entry.
581
582         This function uses ONLY the pointer to the rte (thing) and ignores the other
583         information that symtab foreach function passes (st, entry, and data) which
584         means that it _can_ safetly be used outside of the foreach setting. If
585         the function is changed to depend on any of these three, then a stand-alone
586         rte_cleanup() function should be added and referenced by this, and refererences
587         to this outside of the foreach world should be changed.
588 */
589 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
590         rtable_ent_t*   rte;
591         int i;
592
593         if( (rte = (rtable_ent_t *) thing) == NULL ) {
594                 return;
595         }
596
597         rte->refs--;
598         if( rte->refs > 0 ) {                   // something still referencing, so it lives
599                 return;
600         }
601
602         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
603                 for( i = 0; i < rte->nrrgroups; i++ ) {
604                         if( rte->rrgroups[i] ) {
605                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
606                         }
607                 }
608
609                 free( rte->rrgroups );
610         }
611
612         free( rte );                                                                                    // finally, drop the potato
613 }
614
615 /*
616         Read an entire file into a buffer. We assume for route table files
617         they will be smallish and so this won't be a problem.
618         Returns a pointer to the buffer, or nil. Caller must free.
619         Terminates the buffer with a nil character for string processing.
620
621         If we cannot stat the file, we assume it's empty or missing and return
622         an empty buffer, as opposed to a nil, so the caller can generate defaults
623         or error if an empty/missing file isn't tolerated.
624 */
625 static char* uta_fib( char* fname ) {
626         struct stat     stats;
627         off_t           fsize = 8192;   // size of the file
628         off_t           nread;                  // number of bytes read
629         int                     fd;
630         char*           buf;                    // input buffer
631
632         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
633                 if( fstat( fd, &stats ) >= 0 ) {
634                         if( stats.st_size <= 0 ) {                                      // empty file
635                                 close( fd );
636                                 fd = -1;
637                         } else {
638                                 fsize = stats.st_size;                                          // stat ok, save the file size
639                         }
640                 } else {
641                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
642                 }
643         }
644
645         if( fd < 0 ) {                                                                                  // didn't open or empty
646                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
647                         return NULL;
648                 }
649
650                 *buf = 0;
651                 return buf;
652         }
653
654         // add a size limit check here
655
656         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
657                 close( fd );
658                 errno = ENOMEM;
659                 return NULL;
660         }
661
662         nread = read( fd, buf, fsize );
663         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
664                 free( buf );
665                 errno = EFBIG;                                                                                  // likely too much to handle
666                 close( fd );
667                 return NULL;
668         }
669
670         buf[nread] = 0;
671
672         close( fd );
673         return buf;
674 }
675
676 /*
677         Create and initialise a route table; Returns a pointer to the table struct.
678 */
679 static route_table_t* uta_rt_init( ) {
680         route_table_t*  rt;
681
682         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
683                 return NULL;
684         }
685
686         if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) {               // modest size, prime
687                 free( rt );
688                 return NULL;
689         }
690
691         return rt;
692 }
693
694 /*
695         Clone (sort of) an existing route table.  This is done to preserve the endpoint
696         names referenced in a table (and thus existing sessions) when a new set
697         of message type to endpoint name mappings is received.  A new route table
698         with only endpoint name references is returned based on the active table in
699         the context.
700 */
701 static route_table_t* uta_rt_clone( route_table_t* srt ) {
702         endpoint_t*             ep;             // an endpoint
703         route_table_t*  nrt;    // new route table
704         void*   sst;                    // source symtab
705         void*   nst;                    // new symtab
706         thing_list_t things;
707         int i;
708
709         if( srt == NULL ) {
710                 return NULL;
711         }
712
713         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
714                 return NULL;
715         }
716
717         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
718                 free( nrt );
719                 return NULL;
720         }
721
722         things.nalloc = 2048;
723         things.nused = 0;
724         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
725         if( things.things == NULL ) {
726                 free( nrt->hash );
727                 free( nrt );
728                 return NULL;
729         }
730
731         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
732         nst = nrt->hash;
733
734         rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
735
736         for( i = 0; i < things.nused; i++ ) {
737                 ep = (endpoint_t *) things.things[i];
738                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
739         }
740
741         free( things.things );
742         return nrt;
743 }
744
745 /*
746         Clones _all_ of the given route table (references both endpoints AND the route table
747         entries. Needed to support a partial update where some route table entries will not
748         be deleted if not explicitly in the update.
749 */
750 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
751         endpoint_t*             ep;             // an endpoint
752         rtable_ent_t*   rte;    // a route table entry
753         route_table_t*  nrt;    // new route table
754         void*   sst;                    // source symtab
755         void*   nst;                    // new symtab
756         thing_list_t things0;   // things from space 0 (table entries)
757         thing_list_t things1;   // things from space 1 (end points)
758         int i;
759
760         if( srt == NULL ) {
761                 return NULL;
762         }
763
764         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
765                 return NULL;
766         }
767
768         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
769                 free( nrt );
770                 return NULL;
771         }
772
773         things0.nalloc = 2048;
774         things0.nused = 0;
775         things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
776         if( things0.things == NULL ) {
777                 free( nrt->hash );
778                 free( nrt );
779                 return NULL;
780         }
781
782         things1.nalloc = 2048;
783         things1.nused = 0;
784         things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
785         if( things1.things == NULL ) {
786                 free( nrt->hash );
787                 free( nrt );
788                 return NULL;
789         }
790
791         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
792         nst = nrt->hash;
793
794         rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
795         rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
796
797         for( i = 0; i < things0.nused; i++ ) {
798                 rte = (rtable_ent_t *) things0.things[i];
799                 rte->refs++;                                                                                            // rtes can be removed, so we track references
800                 rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
801         }
802
803         for( i = 0; i < things1.nused; i++ ) {
804                 ep = (endpoint_t *) things1.things[i];
805                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
806         }
807
808         free( things0.things );
809         free( things1.things );
810         return nrt;
811 }
812
813 /*
814         Given a name, find the endpoint struct in the provided route table.
815 */
816 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
817
818         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
819                 return NULL;
820         }
821
822         return rmr_sym_get( rt->hash, ep_name, 1 );
823 }
824
825 /*
826         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
827 */
828 static void uta_rt_drop( route_table_t* rt ) {
829         if( rt == NULL ) {
830                 return;
831         }
832
833         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
834         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
835         free( rt );
836 }
837
838 /*
839         Look up and return the pointer to the endpoint stuct matching the given name.
840         If not in the hash, a new endpoint is created, added to the hash. Should always
841         return a pointer.
842 */
843 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
844         endpoint_t*     ep;
845
846         if( !rt || !ep_name || ! *ep_name ) {
847                 fprintf( stderr, "[WARN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
848                 errno = EINVAL;
849                 return NULL;
850         }
851
852         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
853                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
854                         fprintf( stderr, "[WARN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
855                         errno = ENOMEM;
856                         return NULL;
857                 }
858
859                 ep->open = 0;                                                           // not connected
860                 ep->addr = uta_h2ip( ep_name );
861                 ep->name = strdup( ep_name );
862                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
863
864                 rmr_sym_put( rt->hash, ep_name, 1, ep );
865         }
866
867         return ep;
868 }
869
870
871 /*
872         Given a session id and message type build a key that can be used to look up the rte in the route
873         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
874 */
875 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
876         uint64_t key;
877
878         if( sub_id == UNSET_SUBID ) {
879                 key = 0xffffffff00000000 | mtype;
880         } else {
881                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
882         }
883
884         return key;
885 }
886
887
888 #endif