d19c839371b32b596147e590071b8bad0f9b3615
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58 } thing_list_t;
59
60 // ---- debugging/testing -------------------------------------------------------------------------
61
62 /*
63         Dump stats for an endpoint in the RT.
64 */
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
66         int*    counter;
67         endpoint_t* ep;
68
69         if( (ep = (endpoint_t *) thing) == NULL ) {
70                 return;
71         }
72
73         if( (counter = (int *) vcounter) != NULL ) {
74                 (*counter)++;
75         }
76
77         fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
78 }
79
80 /*
81         Dump stats for a route entry in the table.
82 */
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
84         int*    counter;
85         rtable_ent_t* rte;                      // thing is really an rte
86         int             mtype;
87         int             sid;
88
89         if( (rte = (rtable_ent_t *) thing) == NULL ) {
90                 return;
91         }
92
93         if( (counter = (int *) vcounter) != NULL ) {
94                 (*counter)++;
95         }
96
97         mtype = rte->key & 0xffff;
98         sid = (int) (rte->key >> 32);
99
100         fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
101 }
102
103 /*
104         Given a route table, cause some stats to be spit out.
105 */
106 static void  rt_stats( route_table_t* rt ) {
107         int* counter;
108
109         if( rt == NULL ) {
110                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
111                 return;
112         }
113
114         counter = (int *) malloc( sizeof( int ) );
115         *counter = 0;
116         fprintf( stderr, "[DBUG] rtstats:\n" );
117         rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
118         fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
119
120         *counter = 0;
121         rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
122         fprintf( stderr, "[DBUG] %d entries\n", *counter );
123
124         free( counter );
125 }
126
127
128 // ------------------------------------------------------------------------------------------------
129 /*
130         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
132 */
133 static char* clip( char* buf ) {
134         char*   tok;
135
136         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
137                 buf++;
138         }
139
140         if( (tok = strchr( buf, '#' )) != NULL ) {
141                 if( tok == buf ) {
142                         return buf;                                     // just push back; leading comment sym handled there
143                 }
144
145                 if( isspace( *(tok-1) ) ) {
146                         *tok = 0;
147                 }
148         }
149
150         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
151         *(tok+1) = 0;
152
153         return buf;
154 }
155
156 /*
157         This accepts a pointer to a nil terminated string, and ensures that there is a
158         newline as the last character. If there is not, a new buffer is allocated and
159         the newline is added.  If a new buffer is allocated, the buffer passed in is
160         freed.  The function returns a pointer which the caller should use, and must
161         free.  In the event of an error, a nil pointer is returned.
162 */
163 static char* ensure_nlterm( char* buf ) {
164         char*   nb = NULL;
165         int             len = 1;
166         
167
168         nb = buf;
169         if( buf == NULL || (len = strlen( buf )) < 2 ) {
170                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
171                         *nb = '\n';
172                         *(nb+1) = 0;
173                 }
174         } else {
175                 if( buf[len-1] != '\n' ) {
176                         fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
177                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
178                                 memcpy( nb, buf, len );
179                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
180                                 *(nb+len+1) = 0;
181                                 free( buf );
182                         }       
183                 }
184         }
185
186         return nb;
187 }
188
189 /*
190         Given a message type create a route table entry and add to the hash keyed on the
191         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
192         is the number of group slots to allocate in the entry.
193 */
194 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
195         rtable_ent_t* rte;
196         rtable_ent_t* old_rte;          // entry which was already in the table for the key
197
198         if( rt == NULL ) {
199                 return NULL;
200         }
201
202         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
203                 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
204                 return NULL;
205         }
206         memset( rte, 0, sizeof( *rte ) );
207         rte->refs = 1;
208         rte->key = key;
209
210         if( nrrgroups <= 0 ) {
211                 nrrgroups = 10;
212         }
213
214         if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
215                 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
216                 free( rte );
217                 return NULL;
218         }
219         memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
220         rte->nrrgroups = nrrgroups;
221
222         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
223                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
224         }
225
226         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
227
228         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
229         return rte;
230 }
231
232 /*
233         This accepts partially parsed information from a record sent by route manager or read from
234         a file such that:
235                 ts_field is the msg-type,sender field
236                 subid is the integer subscription id
237                 rr_field is the endpoint information for round robening message over
238
239         If all goes well, this will add an RTE to the table under construction.
240
241         The ts_field is checked to see if we should ingest this record. We ingest if one of
242         these is true:
243                 there is no sender info (a generic entry for all)
244                 there is sender and our host:port matches one of the senders
245                 the sender info is an IP address that matches one of our IP addresses
246 */
247 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
248         rtable_ent_t*   rte;            // route table entry added
249         char*   tok;
250         int             ntoks;
251         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
252         char*   tokens[128];
253         char*   gtokens[64];
254         int             i;
255         int             ngtoks;                         // number of tokens in the group list
256         int             grp;                            // index into group list
257
258         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
259         rr_field = clip( rr_field );
260
261         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
262                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
263                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
264
265                         key = build_rt_key( subid, atoi( ts_field ) );
266
267                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
268
269                         if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
270                                 rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
271
272                                 for( grp = 0; grp < ngtoks; grp++ ) {
273                                         if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
274                                                 for( i = 0; i < ntoks; i++ ) {
275                                                         if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
276                                                                 if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint  ts=%s %s\n", ts_field, tokens[i] );
277                                                                 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
278                                                         }
279                                                 }
280                                         }
281                                 }
282                         }
283                 } else {
284                         if( DEBUG || (vlevel > 2) ) {
285                                 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
286                         }
287                 }
288 }
289
290 /*
291         Trash_entry takes a partially parsed record from the input and
292         will delete the entry if the sender,mtype matches us or it's a
293         generic mtype. The refernce in the new table is removed and the
294         refcounter for the actual rte is decreased. If that ref count is
295         0 then the memory is freed (handled byh the del_rte call).
296 */
297 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
298         rtable_ent_t*   rte;            // route table entry to be 'deleted'
299         char*   tok;
300         int             ntoks;
301         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
302         char*   tokens[128];
303
304         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
305                 return;
306         }
307
308         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
309
310         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
311                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
312                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
313
314                 key = build_rt_key( subid, atoi( ts_field ) );
315                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
316                 if( rte != NULL ) {
317                         if( DEBUG || (vlevel > 1) ) {
318                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
319                         }
320                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
321                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
322                 } else {
323                         if( DEBUG || (vlevel > 1) ) {
324                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
325                         }
326                 }
327         } else {
328                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
329         }
330 }
331
332 /*
333         Parse a single record recevied from the route table generator, or read
334         from a static route table file.  Start records cause a new table to
335         be started (if a partial table was received it is discarded. Table
336         entry records are added to the currenly 'in progress' table, and an
337         end record causes the in progress table to be finalised and the
338         currently active table is replaced.
339
340         We expect one of several types:
341                 newrt|{start|end}
342                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
343                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
344 */
345 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
346         int i;
347         int ntoks;                                                      // number of tokens found in something
348         int ngtoks;
349         int     grp;                                                    // group number
350         rtable_ent_t*   rte;                            // route table entry added
351         char*   tokens[128];
352         char*   gtokens[64];                            // groups
353         char*   tok;                                            // pointer into a token or string
354
355         if( ! buf ) {
356                 return;
357         }
358
359         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
360                 buf++;
361         }
362         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
363         *(tok+1) = 0;
364
365         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
366                 switch( *(tokens[0]) ) {
367                         case 0:                                                                                                 // ignore blanks
368                                 // fallthrough
369                         case '#':                                                                                               // and comment lines
370                                 break;
371
372                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
373                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
374                                         break;
375                                 }
376
377                                 if( ntoks < 3 ) {
378                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
379                                         break;
380                                 }
381
382                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
383                                 ctx->new_rtable->updates++;
384                                 break;
385
386                         case 'n':                                                                                               // newrt|{start|end}
387                                 tokens[1] = clip( tokens[1] );
388                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
389                                         if( ctx->new_rtable ) {
390                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
391                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
392                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
393                                                 ctx->new_rtable = NULL;
394                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
395
396                                                 if( vlevel > 0 ) {
397                                                         fprintf( stderr, "[DBUG] old route table:\n" );
398                                                         rt_stats( ctx->old_rtable );
399                                                         fprintf( stderr, "[DBUG] new route table:\n" );
400                                                         rt_stats( ctx->rtable );
401                                                 }
402                                         } else {
403                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
404                                                 ctx->new_rtable = NULL;
405                                         }
406                                 } else {                                                                                        // start a new table.
407                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
408                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
409                                                 uta_rt_drop( ctx->new_rtable );
410                                         }
411
412                                         if( ctx->rtable )  {
413                                                 ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
414                                         } else {
415                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
416                                         }
417                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
418                                 }
419                                 break;
420
421                         case 'm':                                       // assume mse entry
422                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
423                                         break;
424                                 }
425
426                                 if( ntoks < 4 ) {
427                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
428                                         break;
429                                 }
430
431                                 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
432                                 ctx->new_rtable->updates++;
433                                 break;
434
435                         case 'r':                                       // assume rt entry
436                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
437                                         break;
438                                 }
439
440                                 ctx->new_rtable->updates++;
441                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
442                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
443                                 } else {
444                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
445                                 }
446                                 break;
447
448                         case 'u':                                                                                               // update current table, not a total replacement
449                                 tokens[1] = clip( tokens[1] );
450                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
451                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
452                                                 break;
453                                         }
454
455                                         if( ntoks >2 ) {
456                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
457                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
458                                                                 ctx->new_rtable->updates, tokens[2] );
459                                                         uta_rt_drop( ctx->new_rtable );
460                                                         ctx->new_rtable = NULL;
461                                                         break;
462                                                 }
463                                         }
464
465                                         if( ctx->new_rtable ) {
466                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
467                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
468                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
469                                                 ctx->new_rtable = NULL;
470                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
471
472                                                 if( vlevel > 0 ) {
473                                                         fprintf( stderr, "[DBUG] old route table:\n" );
474                                                         rt_stats( ctx->old_rtable );
475                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
476                                                         rt_stats( ctx->rtable );
477                                                 }
478                                         } else {
479                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
480                                                 ctx->new_rtable = NULL;
481                                         }
482                                 } else {                                                                                        // start a new table.
483                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
484                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
485                                                 uta_rt_drop( ctx->new_rtable );
486                                         }
487
488                                         if( ctx->rtable )  {
489                                                 ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
490                                         } else {
491                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
492                                         }
493
494                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
495                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
496                                 }
497                                 break;
498
499                         default:
500                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
501                                 break;
502                 }
503         }
504 }
505
506 /*
507         This function attempts to open a static route table in order to create a 'seed'
508         table during initialisation.  The environment variable RMR_SEED_RT is expected
509         to contain the necessary path to the file. If missing, or if the file is empty,
510         no route table will be available until one is received from the generator.
511
512         This function is probably most useful for testing situations, or extreme
513         cases where the routes are static.
514 */
515 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
516         int             i;
517         char*   fname;
518         char*   fbuf;                           // buffer with file contents
519         char*   rec;                            // start of the record
520         char*   eor;                            // end of the record
521         int             rcount = 0;                     // record count for debug
522
523         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
524                 return;
525         }
526
527         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
528                 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
529                 return;
530         }
531
532         if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
533         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
534                 if( *eor == '\r' ) {
535                         *eor = '\n';                                                            // will look like a blank line which is ok
536                 }
537         }
538
539         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
540                 rcount++;
541                 if( (eor = strchr( rec, '\n' )) != NULL ) {
542                         *eor = 0;
543                 } else {
544                         fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
545                         fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
546                         free( fbuf );
547                         return;
548                 }
549
550                 parse_rt_rec( ctx, rec, vlevel );
551         }
552
553         if( DEBUG ) fprintf( stderr, "[DBUG] rmr:  seed route table successfully parsed: %d records\n", rcount );
554         free( fbuf );
555 }
556
557 /*
558         Callback driven for each named thing in a symtab. We collect the pointers to those
559         things for later use (cloning).
560 */
561 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
562         thing_list_t*   tl;
563
564         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
565                 return;
566         }
567
568         if( thing == NULL ) {
569                 return;
570         }
571
572         tl->things[tl->nused++] = thing;                // save a reference to the thing
573 }
574
575 /*
576         Called to delete a route table entry struct. We delete the array of endpoint
577         pointers, but NOT the endpoints referenced as those are referenced from
578         multiple entries.
579
580         Route table entries can be concurrently referenced by multiple symtabs, so
581         the actual delete happens only if decrementing the rte's ref count takes it
582         to 0. Thus, it is safe to call this function across a symtab when cleaning up
583         the symtab, or overlaying an entry.
584
585         This function uses ONLY the pointer to the rte (thing) and ignores the other
586         information that symtab foreach function passes (st, entry, and data) which
587         means that it _can_ safetly be used outside of the foreach setting. If
588         the function is changed to depend on any of these three, then a stand-alone
589         rte_cleanup() function should be added and referenced by this, and refererences
590         to this outside of the foreach world should be changed.
591 */
592 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
593         rtable_ent_t*   rte;
594         int i;
595
596         if( (rte = (rtable_ent_t *) thing) == NULL ) {
597                 return;
598         }
599
600         rte->refs--;
601         if( rte->refs > 0 ) {                   // something still referencing, so it lives
602                 return;
603         }
604
605         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
606                 for( i = 0; i < rte->nrrgroups; i++ ) {
607                         if( rte->rrgroups[i] ) {
608                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
609                         }
610                 }
611
612                 free( rte->rrgroups );
613         }
614
615         free( rte );                                                                                    // finally, drop the potato
616 }
617
618 /*
619         Read an entire file into a buffer. We assume for route table files
620         they will be smallish and so this won't be a problem.
621         Returns a pointer to the buffer, or nil. Caller must free.
622         Terminates the buffer with a nil character for string processing.
623
624         If we cannot stat the file, we assume it's empty or missing and return
625         an empty buffer, as opposed to a nil, so the caller can generate defaults
626         or error if an empty/missing file isn't tolerated.
627 */
628 static char* uta_fib( char* fname ) {
629         struct stat     stats;
630         off_t           fsize = 8192;   // size of the file
631         off_t           nread;                  // number of bytes read
632         int                     fd;
633         char*           buf;                    // input buffer
634
635         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
636                 if( fstat( fd, &stats ) >= 0 ) {
637                         if( stats.st_size <= 0 ) {                                      // empty file
638                                 close( fd );
639                                 fd = -1;
640                         } else {
641                                 fsize = stats.st_size;                                          // stat ok, save the file size
642                         }
643                 } else {
644                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
645                 }
646         }
647
648         if( fd < 0 ) {                                                                                  // didn't open or empty
649                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
650                         return NULL;
651                 }
652
653                 *buf = 0;
654                 return buf;
655         }
656
657         // add a size limit check here
658
659         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
660                 close( fd );
661                 errno = ENOMEM;
662                 return NULL;
663         }
664
665         nread = read( fd, buf, fsize );
666         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
667                 free( buf );
668                 errno = EFBIG;                                                                                  // likely too much to handle
669                 close( fd );
670                 return NULL;
671         }
672
673         buf[nread] = 0;
674
675         close( fd );
676         return buf;
677 }
678
679 /*
680         Create and initialise a route table; Returns a pointer to the table struct.
681 */
682 static route_table_t* uta_rt_init( ) {
683         route_table_t*  rt;
684
685         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
686                 return NULL;
687         }
688
689         if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) {               // modest size, prime
690                 free( rt );
691                 return NULL;
692         }
693
694         return rt;
695 }
696
697 /*
698         Clone (sort of) an existing route table.  This is done to preserve the endpoint
699         names referenced in a table (and thus existing sessions) when a new set
700         of message type to endpoint name mappings is received.  A new route table
701         with only endpoint name references is returned based on the active table in
702         the context.
703 */
704 static route_table_t* uta_rt_clone( route_table_t* srt ) {
705         endpoint_t*             ep;             // an endpoint
706         route_table_t*  nrt;    // new route table
707         void*   sst;                    // source symtab
708         void*   nst;                    // new symtab
709         thing_list_t things;
710         int i;
711
712         if( srt == NULL ) {
713                 return NULL;
714         }
715
716         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
717                 return NULL;
718         }
719
720         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
721                 free( nrt );
722                 return NULL;
723         }
724
725         things.nalloc = 2048;
726         things.nused = 0;
727         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
728         if( things.things == NULL ) {
729                 free( nrt->hash );
730                 free( nrt );
731                 return NULL;
732         }
733
734         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
735         nst = nrt->hash;
736
737         rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
738
739         for( i = 0; i < things.nused; i++ ) {
740                 ep = (endpoint_t *) things.things[i];
741                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
742         }
743
744         free( things.things );
745         return nrt;
746 }
747
748 /*
749         Clones _all_ of the given route table (references both endpoints AND the route table
750         entries. Needed to support a partial update where some route table entries will not
751         be deleted if not explicitly in the update.
752 */
753 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
754         endpoint_t*             ep;             // an endpoint
755         rtable_ent_t*   rte;    // a route table entry
756         route_table_t*  nrt;    // new route table
757         void*   sst;                    // source symtab
758         void*   nst;                    // new symtab
759         thing_list_t things0;   // things from space 0 (table entries)
760         thing_list_t things1;   // things from space 1 (end points)
761         int i;
762
763         if( srt == NULL ) {
764                 return NULL;
765         }
766
767         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
768                 return NULL;
769         }
770
771         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
772                 free( nrt );
773                 return NULL;
774         }
775
776         things0.nalloc = 2048;
777         things0.nused = 0;
778         things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
779         if( things0.things == NULL ) {
780                 free( nrt->hash );
781                 free( nrt );
782                 return NULL;
783         }
784
785         things1.nalloc = 2048;
786         things1.nused = 0;
787         things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
788         if( things1.things == NULL ) {
789                 free( nrt->hash );
790                 free( nrt );
791                 return NULL;
792         }
793
794         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
795         nst = nrt->hash;
796
797         rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
798         rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
799
800         for( i = 0; i < things0.nused; i++ ) {
801                 rte = (rtable_ent_t *) things0.things[i];
802                 rte->refs++;                                                                                            // rtes can be removed, so we track references
803                 rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
804         }
805
806         for( i = 0; i < things1.nused; i++ ) {
807                 ep = (endpoint_t *) things1.things[i];
808                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
809         }
810
811         free( things0.things );
812         free( things1.things );
813         return nrt;
814 }
815
816 /*
817         Given a name, find the endpoint struct in the provided route table.
818 */
819 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
820
821         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
822                 return NULL;
823         }
824
825         return rmr_sym_get( rt->hash, ep_name, 1 );
826 }
827
828 /*
829         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
830 */
831 static void uta_rt_drop( route_table_t* rt ) {
832         if( rt == NULL ) {
833                 return;
834         }
835
836         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
837         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
838         free( rt );
839 }
840
841 /*
842         Look up and return the pointer to the endpoint stuct matching the given name.
843         If not in the hash, a new endpoint is created, added to the hash. Should always
844         return a pointer.
845 */
846 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
847         endpoint_t*     ep;
848
849         if( !rt || !ep_name || ! *ep_name ) {
850                 fprintf( stderr, "[WRN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
851                 errno = EINVAL;
852                 return NULL;
853         }
854
855         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
856                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
857                         fprintf( stderr, "[WRN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
858                         errno = ENOMEM;
859                         return NULL;
860                 }
861
862                 ep->open = 0;                                                           // not connected
863                 ep->addr = uta_h2ip( ep_name );
864                 ep->name = strdup( ep_name );
865                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
866
867                 rmr_sym_put( rt->hash, ep_name, 1, ep );
868         }
869
870         return ep;
871 }
872
873
874 /*
875         Given a session id and message type build a key that can be used to look up the rte in the route
876         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
877 */
878 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
879         uint64_t key;
880
881         if( sub_id == UNSET_SUBID ) {
882                 key = 0xffffffff00000000 | mtype;
883         } else {
884                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
885         }
886
887         return key;
888 }
889
890
891 #endif