Add ability to track send counts for an endpoint
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58 } thing_list_t;
59
60 // ---- debugging/testing -------------------------------------------------------------------------
61
62 /*
63         Dump some stats for an endpoint in the RT. This is generally called to
64         verify endpoints after a table load/change.
65 */
66 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
67         int*    counter;
68         endpoint_t* ep;
69
70         if( (ep = (endpoint_t *) thing) == NULL ) {
71                 return;
72         }
73
74         if( (counter = (int *) vcounter) != NULL ) {
75                 (*counter)++;
76         }
77
78         fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open );
79 }
80
81 /*
82         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
83         the 'source' information and is added to each message.
84 */
85 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
86         endpoint_t* ep;
87         char*   id;
88
89         if( (ep = (endpoint_t *) thing) == NULL ) {
90                 return;
91         }
92
93         if( (id = (char *) vid) == NULL ) {
94                 id = "missing";
95         }
96
97         fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n", 
98                 (long long) time( NULL ), 
99                 id, 
100                 ep->name, 
101                 ep->open, 
102                 ep->scounts[EPSC_GOOD], 
103                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS], 
104                 ep->scounts[EPSC_FAIL], 
105                 ep->scounts[EPSC_TRANS]   );
106 }
107
108 /*
109         Dump stats for a route entry in the table.
110 */
111 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
112         int*    counter;
113         rtable_ent_t* rte;                      // thing is really an rte
114         int             mtype;
115         int             sid;
116
117         if( (rte = (rtable_ent_t *) thing) == NULL ) {
118                 return;
119         }
120
121         if( (counter = (int *) vcounter) != NULL ) {
122                 (*counter)++;
123         }
124
125         mtype = rte->key & 0xffff;
126         sid = (int) (rte->key >> 32);
127
128         fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
129 }
130
131 /*
132         Given a route table, cause some stats to be spit out.
133 */
134 static void  rt_stats( route_table_t* rt ) {
135         int* counter;
136
137         if( rt == NULL ) {
138                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
139                 return;
140         }
141
142         counter = (int *) malloc( sizeof( int ) );
143         *counter = 0;
144         fprintf( stderr, "[DBUG] rtstats:\n" );
145         rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
146         fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
147
148         *counter = 0;
149         rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
150         fprintf( stderr, "[DBUG] %d entries\n", *counter );
151
152         free( counter );
153 }
154
155 /*
156         Given a route table, cause endpoint counters to be written to stderr. The id
157         parm is written as the "source" in the output.
158 */
159 static void  rt_epcounts( route_table_t* rt, char* id ) {
160         if( rt == NULL ) {
161                 fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
162                 return;
163         }
164
165         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
166 }
167
168
169 // ------------------------------------------------------------------------------------------------
170 /*
171         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
172         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
173 */
174 static char* clip( char* buf ) {
175         char*   tok;
176
177         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
178                 buf++;
179         }
180
181         if( (tok = strchr( buf, '#' )) != NULL ) {
182                 if( tok == buf ) {
183                         return buf;                                     // just push back; leading comment sym handled there
184                 }
185
186                 if( isspace( *(tok-1) ) ) {
187                         *tok = 0;
188                 }
189         }
190
191         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
192         *(tok+1) = 0;
193
194         return buf;
195 }
196
197 /*
198         This accepts a pointer to a nil terminated string, and ensures that there is a
199         newline as the last character. If there is not, a new buffer is allocated and
200         the newline is added.  If a new buffer is allocated, the buffer passed in is
201         freed.  The function returns a pointer which the caller should use, and must
202         free.  In the event of an error, a nil pointer is returned.
203 */
204 static char* ensure_nlterm( char* buf ) {
205         char*   nb = NULL;
206         int             len = 1;
207         
208
209         nb = buf;
210         if( buf == NULL || (len = strlen( buf )) < 2 ) {
211                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
212                         *nb = '\n';
213                         *(nb+1) = 0;
214                 }
215         } else {
216                 if( buf[len-1] != '\n' ) {
217                         fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
218                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
219                                 memcpy( nb, buf, len );
220                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
221                                 *(nb+len+1) = 0;
222                         }       
223
224                         free( buf );
225                 }
226         }
227
228         return nb;
229 }
230
231 /*
232         Given a message type create a route table entry and add to the hash keyed on the
233         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
234         is the number of group slots to allocate in the entry.
235 */
236 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
237         rtable_ent_t* rte;
238         rtable_ent_t* old_rte;          // entry which was already in the table for the key
239
240         if( rt == NULL ) {
241                 return NULL;
242         }
243
244         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
245                 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
246                 return NULL;
247         }
248         memset( rte, 0, sizeof( *rte ) );
249         rte->refs = 1;
250         rte->key = key;
251
252         if( nrrgroups <= 0 ) {
253                 nrrgroups = 10;
254         }
255
256         if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
257                 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
258                 free( rte );
259                 return NULL;
260         }
261         memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
262         rte->nrrgroups = nrrgroups;
263
264         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
265                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
266         }
267
268         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
269
270         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
271         return rte;
272 }
273
274 /*
275         This accepts partially parsed information from a record sent by route manager or read from
276         a file such that:
277                 ts_field is the msg-type,sender field
278                 subid is the integer subscription id
279                 rr_field is the endpoint information for round robening message over
280
281         If all goes well, this will add an RTE to the table under construction.
282
283         The ts_field is checked to see if we should ingest this record. We ingest if one of
284         these is true:
285                 there is no sender info (a generic entry for all)
286                 there is sender and our host:port matches one of the senders
287                 the sender info is an IP address that matches one of our IP addresses
288 */
289 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
290         rtable_ent_t*   rte;            // route table entry added
291         char*   tok;
292         int             ntoks;
293         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
294         char*   tokens[128];
295         char*   gtokens[64];
296         int             i;
297         int             ngtoks;                         // number of tokens in the group list
298         int             grp;                            // index into group list
299
300         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
301         rr_field = clip( rr_field );
302
303         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
304                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
305                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
306
307                         key = build_rt_key( subid, atoi( ts_field ) );
308
309                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
310
311                         if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
312                                 rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
313
314                                 for( grp = 0; grp < ngtoks; grp++ ) {
315                                         if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
316                                                 for( i = 0; i < ntoks; i++ ) {
317                                                         if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
318                                                                 if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint  ts=%s %s\n", ts_field, tokens[i] );
319                                                                 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
320                                                         }
321                                                 }
322                                         }
323                                 }
324                         }
325                 } else {
326                         if( DEBUG || (vlevel > 2) ) {
327                                 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
328                         }
329                 }
330 }
331
332 /*
333         Trash_entry takes a partially parsed record from the input and
334         will delete the entry if the sender,mtype matches us or it's a
335         generic mtype. The refernce in the new table is removed and the
336         refcounter for the actual rte is decreased. If that ref count is
337         0 then the memory is freed (handled byh the del_rte call).
338 */
339 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
340         rtable_ent_t*   rte;            // route table entry to be 'deleted'
341         char*   tok;
342         int             ntoks;
343         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
344         char*   tokens[128];
345
346         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
347                 return;
348         }
349
350         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
351
352         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
353                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
354                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
355
356                 key = build_rt_key( subid, atoi( ts_field ) );
357                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
358                 if( rte != NULL ) {
359                         if( DEBUG || (vlevel > 1) ) {
360                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
361                         }
362                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
363                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
364                 } else {
365                         if( DEBUG || (vlevel > 1) ) {
366                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
367                         }
368                 }
369         } else {
370                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
371         }
372 }
373
374 /*
375         Parse a single record recevied from the route table generator, or read
376         from a static route table file.  Start records cause a new table to
377         be started (if a partial table was received it is discarded. Table
378         entry records are added to the currenly 'in progress' table, and an
379         end record causes the in progress table to be finalised and the
380         currently active table is replaced.
381
382         We expect one of several types:
383                 newrt|{start|end}
384                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
385                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
386 */
387 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
388         int i;
389         int ntoks;                                                      // number of tokens found in something
390         int ngtoks;
391         int     grp;                                                    // group number
392         rtable_ent_t*   rte;                            // route table entry added
393         char*   tokens[128];
394         char*   gtokens[64];                            // groups
395         char*   tok;                                            // pointer into a token or string
396
397         if( ! buf ) {
398                 return;
399         }
400
401         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
402                 buf++;
403         }
404         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
405         *(tok+1) = 0;
406
407         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
408                 switch( *(tokens[0]) ) {
409                         case 0:                                                                                                 // ignore blanks
410                                 // fallthrough
411                         case '#':                                                                                               // and comment lines
412                                 break;
413
414                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
415                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
416                                         break;
417                                 }
418
419                                 if( ntoks < 3 ) {
420                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
421                                         break;
422                                 }
423
424                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
425                                 ctx->new_rtable->updates++;
426                                 break;
427
428                         case 'n':                                                                                               // newrt|{start|end}
429                                 tokens[1] = clip( tokens[1] );
430                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
431                                         if( ctx->new_rtable ) {
432                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
433                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
434                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
435                                                 ctx->new_rtable = NULL;
436                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
437
438                                                 if( vlevel > 0 ) {
439                                                         fprintf( stderr, "[DBUG] old route table:\n" );
440                                                         rt_stats( ctx->old_rtable );
441                                                         fprintf( stderr, "[DBUG] new route table:\n" );
442                                                         rt_stats( ctx->rtable );
443                                                 }
444                                         } else {
445                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
446                                                 ctx->new_rtable = NULL;
447                                         }
448                                 } else {                                                                                        // start a new table.
449                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
450                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
451                                                 uta_rt_drop( ctx->new_rtable );
452                                         }
453
454                                         if( ctx->rtable )  {
455                                                 ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
456                                         } else {
457                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
458                                         }
459                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
460                                 }
461                                 break;
462
463                         case 'm':                                       // assume mse entry
464                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
465                                         break;
466                                 }
467
468                                 if( ntoks < 4 ) {
469                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
470                                         break;
471                                 }
472
473                                 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
474                                 ctx->new_rtable->updates++;
475                                 break;
476
477                         case 'r':                                       // assume rt entry
478                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
479                                         break;
480                                 }
481
482                                 ctx->new_rtable->updates++;
483                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
484                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
485                                 } else {
486                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
487                                 }
488                                 break;
489
490                         case 'u':                                                                                               // update current table, not a total replacement
491                                 tokens[1] = clip( tokens[1] );
492                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
493                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
494                                                 break;
495                                         }
496
497                                         if( ntoks >2 ) {
498                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
499                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
500                                                                 ctx->new_rtable->updates, tokens[2] );
501                                                         uta_rt_drop( ctx->new_rtable );
502                                                         ctx->new_rtable = NULL;
503                                                         break;
504                                                 }
505                                         }
506
507                                         if( ctx->new_rtable ) {
508                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
509                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
510                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
511                                                 ctx->new_rtable = NULL;
512                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
513
514                                                 if( vlevel > 0 ) {
515                                                         fprintf( stderr, "[DBUG] old route table:\n" );
516                                                         rt_stats( ctx->old_rtable );
517                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
518                                                         rt_stats( ctx->rtable );
519                                                 }
520                                         } else {
521                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
522                                                 ctx->new_rtable = NULL;
523                                         }
524                                 } else {                                                                                        // start a new table.
525                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
526                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
527                                                 uta_rt_drop( ctx->new_rtable );
528                                         }
529
530                                         if( ctx->rtable )  {
531                                                 ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
532                                         } else {
533                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
534                                         }
535
536                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
537                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
538                                 }
539                                 break;
540
541                         default:
542                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
543                                 break;
544                 }
545         }
546 }
547
548 /*
549         This function attempts to open a static route table in order to create a 'seed'
550         table during initialisation.  The environment variable RMR_SEED_RT is expected
551         to contain the necessary path to the file. If missing, or if the file is empty,
552         no route table will be available until one is received from the generator.
553
554         This function is probably most useful for testing situations, or extreme
555         cases where the routes are static.
556 */
557 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
558         int             i;
559         char*   fname;
560         char*   fbuf;                           // buffer with file contents
561         char*   rec;                            // start of the record
562         char*   eor;                            // end of the record
563         int             rcount = 0;                     // record count for debug
564
565         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
566                 return;
567         }
568
569         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
570                 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
571                 return;
572         }
573
574         if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
575         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
576                 if( *eor == '\r' ) {
577                         *eor = '\n';                                                            // will look like a blank line which is ok
578                 }
579         }
580
581         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
582                 rcount++;
583                 if( (eor = strchr( rec, '\n' )) != NULL ) {
584                         *eor = 0;
585                 } else {
586                         fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
587                         fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
588                         free( fbuf );
589                         return;
590                 }
591
592                 parse_rt_rec( ctx, rec, vlevel );
593         }
594
595         if( DEBUG ) fprintf( stderr, "[DBUG] rmr:  seed route table successfully parsed: %d records\n", rcount );
596         free( fbuf );
597 }
598
599 /*
600         Callback driven for each named thing in a symtab. We collect the pointers to those
601         things for later use (cloning).
602 */
603 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
604         thing_list_t*   tl;
605
606         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
607                 return;
608         }
609
610         if( thing == NULL ) {
611                 return;
612         }
613
614         tl->things[tl->nused++] = thing;                // save a reference to the thing
615 }
616
617 /*
618         Called to delete a route table entry struct. We delete the array of endpoint
619         pointers, but NOT the endpoints referenced as those are referenced from
620         multiple entries.
621
622         Route table entries can be concurrently referenced by multiple symtabs, so
623         the actual delete happens only if decrementing the rte's ref count takes it
624         to 0. Thus, it is safe to call this function across a symtab when cleaning up
625         the symtab, or overlaying an entry.
626
627         This function uses ONLY the pointer to the rte (thing) and ignores the other
628         information that symtab foreach function passes (st, entry, and data) which
629         means that it _can_ safetly be used outside of the foreach setting. If
630         the function is changed to depend on any of these three, then a stand-alone
631         rte_cleanup() function should be added and referenced by this, and refererences
632         to this outside of the foreach world should be changed.
633 */
634 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
635         rtable_ent_t*   rte;
636         int i;
637
638         if( (rte = (rtable_ent_t *) thing) == NULL ) {
639                 return;
640         }
641
642         rte->refs--;
643         if( rte->refs > 0 ) {                   // something still referencing, so it lives
644                 return;
645         }
646
647         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
648                 for( i = 0; i < rte->nrrgroups; i++ ) {
649                         if( rte->rrgroups[i] ) {
650                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
651                         }
652                 }
653
654                 free( rte->rrgroups );
655         }
656
657         free( rte );                                                                                    // finally, drop the potato
658 }
659
660 /*
661         Read an entire file into a buffer. We assume for route table files
662         they will be smallish and so this won't be a problem.
663         Returns a pointer to the buffer, or nil. Caller must free.
664         Terminates the buffer with a nil character for string processing.
665
666         If we cannot stat the file, we assume it's empty or missing and return
667         an empty buffer, as opposed to a nil, so the caller can generate defaults
668         or error if an empty/missing file isn't tolerated.
669 */
670 static char* uta_fib( char* fname ) {
671         struct stat     stats;
672         off_t           fsize = 8192;   // size of the file
673         off_t           nread;                  // number of bytes read
674         int                     fd;
675         char*           buf;                    // input buffer
676
677         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
678                 if( fstat( fd, &stats ) >= 0 ) {
679                         if( stats.st_size <= 0 ) {                                      // empty file
680                                 close( fd );
681                                 fd = -1;
682                         } else {
683                                 fsize = stats.st_size;                                          // stat ok, save the file size
684                         }
685                 } else {
686                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
687                 }
688         }
689
690         if( fd < 0 ) {                                                                                  // didn't open or empty
691                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
692                         return NULL;
693                 }
694
695                 *buf = 0;
696                 return buf;
697         }
698
699         // add a size limit check here
700
701         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
702                 close( fd );
703                 errno = ENOMEM;
704                 return NULL;
705         }
706
707         nread = read( fd, buf, fsize );
708         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
709                 free( buf );
710                 errno = EFBIG;                                                                                  // likely too much to handle
711                 close( fd );
712                 return NULL;
713         }
714
715         buf[nread] = 0;
716
717         close( fd );
718         return buf;
719 }
720
721 /*
722         Create and initialise a route table; Returns a pointer to the table struct.
723 */
724 static route_table_t* uta_rt_init( ) {
725         route_table_t*  rt;
726
727         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
728                 return NULL;
729         }
730
731         if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) {               // modest size, prime
732                 free( rt );
733                 return NULL;
734         }
735
736         return rt;
737 }
738
739 /*
740         Clone (sort of) an existing route table.  This is done to preserve the endpoint
741         names referenced in a table (and thus existing sessions) when a new set
742         of message type to endpoint name mappings is received.  A new route table
743         with only endpoint name references is returned based on the active table in
744         the context.
745 */
746 static route_table_t* uta_rt_clone( route_table_t* srt ) {
747         endpoint_t*             ep;             // an endpoint
748         route_table_t*  nrt;    // new route table
749         void*   sst;                    // source symtab
750         void*   nst;                    // new symtab
751         thing_list_t things;
752         int i;
753
754         if( srt == NULL ) {
755                 return NULL;
756         }
757
758         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
759                 return NULL;
760         }
761
762         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
763                 free( nrt );
764                 return NULL;
765         }
766
767         things.nalloc = 2048;
768         things.nused = 0;
769         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
770         if( things.things == NULL ) {
771                 free( nrt->hash );
772                 free( nrt );
773                 return NULL;
774         }
775
776         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
777         nst = nrt->hash;
778
779         rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
780
781         for( i = 0; i < things.nused; i++ ) {
782                 ep = (endpoint_t *) things.things[i];
783                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
784         }
785
786         free( things.things );
787         return nrt;
788 }
789
790 /*
791         Clones _all_ of the given route table (references both endpoints AND the route table
792         entries. Needed to support a partial update where some route table entries will not
793         be deleted if not explicitly in the update.
794 */
795 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
796         endpoint_t*             ep;             // an endpoint
797         rtable_ent_t*   rte;    // a route table entry
798         route_table_t*  nrt;    // new route table
799         void*   sst;                    // source symtab
800         void*   nst;                    // new symtab
801         thing_list_t things0;   // things from space 0 (table entries)
802         thing_list_t things1;   // things from space 1 (end points)
803         int i;
804
805         if( srt == NULL ) {
806                 return NULL;
807         }
808
809         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
810                 return NULL;
811         }
812
813         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
814                 free( nrt );
815                 return NULL;
816         }
817
818         things0.nalloc = 2048;
819         things0.nused = 0;
820         things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
821         if( things0.things == NULL ) {
822                 free( nrt->hash );
823                 free( nrt );
824                 return NULL;
825         }
826
827         things1.nalloc = 2048;
828         things1.nused = 0;
829         things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
830         if( things1.things == NULL ) {
831                 free( nrt->hash );
832                 free( nrt );
833                 return NULL;
834         }
835
836         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
837         nst = nrt->hash;
838
839         rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
840         rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
841
842         for( i = 0; i < things0.nused; i++ ) {
843                 rte = (rtable_ent_t *) things0.things[i];
844                 rte->refs++;                                                                                            // rtes can be removed, so we track references
845                 rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
846         }
847
848         for( i = 0; i < things1.nused; i++ ) {
849                 ep = (endpoint_t *) things1.things[i];
850                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
851         }
852
853         free( things0.things );
854         free( things1.things );
855         return nrt;
856 }
857
858 /*
859         Given a name, find the endpoint struct in the provided route table.
860 */
861 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
862
863         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
864                 return NULL;
865         }
866
867         return rmr_sym_get( rt->hash, ep_name, 1 );
868 }
869
870 /*
871         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
872 */
873 static void uta_rt_drop( route_table_t* rt ) {
874         if( rt == NULL ) {
875                 return;
876         }
877
878         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
879         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
880         free( rt );
881 }
882
883 /*
884         Look up and return the pointer to the endpoint stuct matching the given name.
885         If not in the hash, a new endpoint is created, added to the hash. Should always
886         return a pointer.
887 */
888 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
889         endpoint_t*     ep;
890
891         if( !rt || !ep_name || ! *ep_name ) {
892                 fprintf( stderr, "[WRN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
893                 errno = EINVAL;
894                 return NULL;
895         }
896
897         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
898                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
899                         fprintf( stderr, "[WRN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
900                         errno = ENOMEM;
901                         return NULL;
902                 }
903
904                 ep->open = 0;                                                           // not connected
905                 ep->addr = uta_h2ip( ep_name );
906                 ep->name = strdup( ep_name );
907                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
908                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
909
910                 rmr_sym_put( rt->hash, ep_name, 1, ep );
911         }
912
913         return ep;
914 }
915
916
917 /*
918         Given a session id and message type build a key that can be used to look up the rte in the route
919         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
920 */
921 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
922         uint64_t key;
923
924         if( sub_id == UNSET_SUBID ) {
925                 key = 0xffffffff00000000 | mtype;
926         } else {
927                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
928         }
929
930         return key;
931 }
932
933
934 #endif