Address potential error state after good send
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58 } thing_list_t;
59
60 // ---- debugging/testing -------------------------------------------------------------------------
61
62 /*
63         Dump stats for an endpoint in the RT.
64 */
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
66         int*    counter;
67         endpoint_t* ep;
68
69         if( (ep = (endpoint_t *) thing) == NULL ) {
70                 return;
71         }
72
73         if( (counter = (int *) vcounter) != NULL ) {
74                 (*counter)++;
75         }
76
77         fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
78 }
79
80 /*
81         Dump stats for a route entry in the table.
82 */
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
84         int*    counter;
85         rtable_ent_t* rte;                      // thing is really an rte
86         int             mtype;
87         int             sid;
88
89         if( (rte = (rtable_ent_t *) thing) == NULL ) {
90                 return;
91         }
92
93         if( (counter = (int *) vcounter) != NULL ) {
94                 (*counter)++;
95         }
96
97         mtype = rte->key & 0xffff;
98         sid = (int) (rte->key >> 32);
99
100         fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
101 }
102
103 /*
104         Given a route table, cause some stats to be spit out.
105 */
106 static void  rt_stats( route_table_t* rt ) {
107         int* counter;
108
109         if( rt == NULL ) {
110                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
111                 return;
112         }
113
114         counter = (int *) malloc( sizeof( int ) );
115         *counter = 0;
116         fprintf( stderr, "[DBUG] rtstats:\n" );
117         rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
118         fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
119
120         *counter = 0;
121         rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
122         fprintf( stderr, "[DBUG] %d entries\n", *counter );
123
124         free( counter );
125 }
126
127
128 // ------------------------------------------------------------------------------------------------
129 /*
130         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
132 */
133 static char* clip( char* buf ) {
134         char*   tok;
135
136         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
137                 buf++;
138         }
139
140         if( (tok = strchr( buf, '#' )) != NULL ) {
141                 if( tok == buf ) {
142                         return buf;                                     // just push back; leading comment sym handled there
143                 }
144
145                 if( isspace( *(tok-1) ) ) {
146                         *tok = 0;
147                 }
148         }
149
150         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
151         *(tok+1) = 0;
152
153         return buf;
154 }
155
156 /*
157         This accepts a pointer to a nil terminated string, and ensures that there is a
158         newline as the last character. If there is not, a new buffer is allocated and
159         the newline is added.  If a new buffer is allocated, the buffer passed in is
160         freed.  The function returns a pointer which the caller should use, and must
161         free.  In the event of an error, a nil pointer is returned.
162 */
163 static char* ensure_nlterm( char* buf ) {
164         char*   nb = NULL;
165         int             len = 1;
166         
167
168         nb = buf;
169         if( buf == NULL || (len = strlen( buf )) < 2 ) {
170                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
171                         *nb = '\n';
172                         *(nb+1) = 0;
173                 }
174         } else {
175                 if( buf[len-1] != '\n' ) {
176                         fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
177                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
178                                 memcpy( nb, buf, len );
179                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
180                                 *(nb+len+1) = 0;
181                         }       
182
183                         free( buf );
184                 }
185         }
186
187         return nb;
188 }
189
190 /*
191         Given a message type create a route table entry and add to the hash keyed on the
192         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
193         is the number of group slots to allocate in the entry.
194 */
195 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
196         rtable_ent_t* rte;
197         rtable_ent_t* old_rte;          // entry which was already in the table for the key
198
199         if( rt == NULL ) {
200                 return NULL;
201         }
202
203         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
204                 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
205                 return NULL;
206         }
207         memset( rte, 0, sizeof( *rte ) );
208         rte->refs = 1;
209         rte->key = key;
210
211         if( nrrgroups <= 0 ) {
212                 nrrgroups = 10;
213         }
214
215         if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
216                 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
217                 free( rte );
218                 return NULL;
219         }
220         memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
221         rte->nrrgroups = nrrgroups;
222
223         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
224                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
225         }
226
227         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
228
229         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
230         return rte;
231 }
232
233 /*
234         This accepts partially parsed information from a record sent by route manager or read from
235         a file such that:
236                 ts_field is the msg-type,sender field
237                 subid is the integer subscription id
238                 rr_field is the endpoint information for round robening message over
239
240         If all goes well, this will add an RTE to the table under construction.
241
242         The ts_field is checked to see if we should ingest this record. We ingest if one of
243         these is true:
244                 there is no sender info (a generic entry for all)
245                 there is sender and our host:port matches one of the senders
246                 the sender info is an IP address that matches one of our IP addresses
247 */
248 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
249         rtable_ent_t*   rte;            // route table entry added
250         char*   tok;
251         int             ntoks;
252         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
253         char*   tokens[128];
254         char*   gtokens[64];
255         int             i;
256         int             ngtoks;                         // number of tokens in the group list
257         int             grp;                            // index into group list
258
259         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
260         rr_field = clip( rr_field );
261
262         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
263                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
264                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
265
266                         key = build_rt_key( subid, atoi( ts_field ) );
267
268                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
269
270                         if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
271                                 rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
272
273                                 for( grp = 0; grp < ngtoks; grp++ ) {
274                                         if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
275                                                 for( i = 0; i < ntoks; i++ ) {
276                                                         if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
277                                                                 if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint  ts=%s %s\n", ts_field, tokens[i] );
278                                                                 uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
279                                                         }
280                                                 }
281                                         }
282                                 }
283                         }
284                 } else {
285                         if( DEBUG || (vlevel > 2) ) {
286                                 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
287                         }
288                 }
289 }
290
291 /*
292         Trash_entry takes a partially parsed record from the input and
293         will delete the entry if the sender,mtype matches us or it's a
294         generic mtype. The refernce in the new table is removed and the
295         refcounter for the actual rte is decreased. If that ref count is
296         0 then the memory is freed (handled byh the del_rte call).
297 */
298 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
299         rtable_ent_t*   rte;            // route table entry to be 'deleted'
300         char*   tok;
301         int             ntoks;
302         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
303         char*   tokens[128];
304
305         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
306                 return;
307         }
308
309         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
310
311         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
312                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
313                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
314
315                 key = build_rt_key( subid, atoi( ts_field ) );
316                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
317                 if( rte != NULL ) {
318                         if( DEBUG || (vlevel > 1) ) {
319                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
320                         }
321                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
322                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
323                 } else {
324                         if( DEBUG || (vlevel > 1) ) {
325                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
326                         }
327                 }
328         } else {
329                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
330         }
331 }
332
333 /*
334         Parse a single record recevied from the route table generator, or read
335         from a static route table file.  Start records cause a new table to
336         be started (if a partial table was received it is discarded. Table
337         entry records are added to the currenly 'in progress' table, and an
338         end record causes the in progress table to be finalised and the
339         currently active table is replaced.
340
341         We expect one of several types:
342                 newrt|{start|end}
343                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
344                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
345 */
346 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
347         int i;
348         int ntoks;                                                      // number of tokens found in something
349         int ngtoks;
350         int     grp;                                                    // group number
351         rtable_ent_t*   rte;                            // route table entry added
352         char*   tokens[128];
353         char*   gtokens[64];                            // groups
354         char*   tok;                                            // pointer into a token or string
355
356         if( ! buf ) {
357                 return;
358         }
359
360         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
361                 buf++;
362         }
363         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
364         *(tok+1) = 0;
365
366         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
367                 switch( *(tokens[0]) ) {
368                         case 0:                                                                                                 // ignore blanks
369                                 // fallthrough
370                         case '#':                                                                                               // and comment lines
371                                 break;
372
373                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
374                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
375                                         break;
376                                 }
377
378                                 if( ntoks < 3 ) {
379                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
380                                         break;
381                                 }
382
383                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
384                                 ctx->new_rtable->updates++;
385                                 break;
386
387                         case 'n':                                                                                               // newrt|{start|end}
388                                 tokens[1] = clip( tokens[1] );
389                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
390                                         if( ctx->new_rtable ) {
391                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
392                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
393                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
394                                                 ctx->new_rtable = NULL;
395                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
396
397                                                 if( vlevel > 0 ) {
398                                                         fprintf( stderr, "[DBUG] old route table:\n" );
399                                                         rt_stats( ctx->old_rtable );
400                                                         fprintf( stderr, "[DBUG] new route table:\n" );
401                                                         rt_stats( ctx->rtable );
402                                                 }
403                                         } else {
404                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
405                                                 ctx->new_rtable = NULL;
406                                         }
407                                 } else {                                                                                        // start a new table.
408                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
409                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
410                                                 uta_rt_drop( ctx->new_rtable );
411                                         }
412
413                                         if( ctx->rtable )  {
414                                                 ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
415                                         } else {
416                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
417                                         }
418                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
419                                 }
420                                 break;
421
422                         case 'm':                                       // assume mse entry
423                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
424                                         break;
425                                 }
426
427                                 if( ntoks < 4 ) {
428                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
429                                         break;
430                                 }
431
432                                 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
433                                 ctx->new_rtable->updates++;
434                                 break;
435
436                         case 'r':                                       // assume rt entry
437                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
438                                         break;
439                                 }
440
441                                 ctx->new_rtable->updates++;
442                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
443                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
444                                 } else {
445                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
446                                 }
447                                 break;
448
449                         case 'u':                                                                                               // update current table, not a total replacement
450                                 tokens[1] = clip( tokens[1] );
451                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
452                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
453                                                 break;
454                                         }
455
456                                         if( ntoks >2 ) {
457                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
458                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
459                                                                 ctx->new_rtable->updates, tokens[2] );
460                                                         uta_rt_drop( ctx->new_rtable );
461                                                         ctx->new_rtable = NULL;
462                                                         break;
463                                                 }
464                                         }
465
466                                         if( ctx->new_rtable ) {
467                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
468                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
469                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
470                                                 ctx->new_rtable = NULL;
471                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
472
473                                                 if( vlevel > 0 ) {
474                                                         fprintf( stderr, "[DBUG] old route table:\n" );
475                                                         rt_stats( ctx->old_rtable );
476                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
477                                                         rt_stats( ctx->rtable );
478                                                 }
479                                         } else {
480                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
481                                                 ctx->new_rtable = NULL;
482                                         }
483                                 } else {                                                                                        // start a new table.
484                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
485                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
486                                                 uta_rt_drop( ctx->new_rtable );
487                                         }
488
489                                         if( ctx->rtable )  {
490                                                 ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
491                                         } else {
492                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
493                                         }
494
495                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
496                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
497                                 }
498                                 break;
499
500                         default:
501                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
502                                 break;
503                 }
504         }
505 }
506
507 /*
508         This function attempts to open a static route table in order to create a 'seed'
509         table during initialisation.  The environment variable RMR_SEED_RT is expected
510         to contain the necessary path to the file. If missing, or if the file is empty,
511         no route table will be available until one is received from the generator.
512
513         This function is probably most useful for testing situations, or extreme
514         cases where the routes are static.
515 */
516 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
517         int             i;
518         char*   fname;
519         char*   fbuf;                           // buffer with file contents
520         char*   rec;                            // start of the record
521         char*   eor;                            // end of the record
522         int             rcount = 0;                     // record count for debug
523
524         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
525                 return;
526         }
527
528         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
529                 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
530                 return;
531         }
532
533         if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
534         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
535                 if( *eor == '\r' ) {
536                         *eor = '\n';                                                            // will look like a blank line which is ok
537                 }
538         }
539
540         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
541                 rcount++;
542                 if( (eor = strchr( rec, '\n' )) != NULL ) {
543                         *eor = 0;
544                 } else {
545                         fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
546                         fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
547                         free( fbuf );
548                         return;
549                 }
550
551                 parse_rt_rec( ctx, rec, vlevel );
552         }
553
554         if( DEBUG ) fprintf( stderr, "[DBUG] rmr:  seed route table successfully parsed: %d records\n", rcount );
555         free( fbuf );
556 }
557
558 /*
559         Callback driven for each named thing in a symtab. We collect the pointers to those
560         things for later use (cloning).
561 */
562 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
563         thing_list_t*   tl;
564
565         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
566                 return;
567         }
568
569         if( thing == NULL ) {
570                 return;
571         }
572
573         tl->things[tl->nused++] = thing;                // save a reference to the thing
574 }
575
576 /*
577         Called to delete a route table entry struct. We delete the array of endpoint
578         pointers, but NOT the endpoints referenced as those are referenced from
579         multiple entries.
580
581         Route table entries can be concurrently referenced by multiple symtabs, so
582         the actual delete happens only if decrementing the rte's ref count takes it
583         to 0. Thus, it is safe to call this function across a symtab when cleaning up
584         the symtab, or overlaying an entry.
585
586         This function uses ONLY the pointer to the rte (thing) and ignores the other
587         information that symtab foreach function passes (st, entry, and data) which
588         means that it _can_ safetly be used outside of the foreach setting. If
589         the function is changed to depend on any of these three, then a stand-alone
590         rte_cleanup() function should be added and referenced by this, and refererences
591         to this outside of the foreach world should be changed.
592 */
593 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
594         rtable_ent_t*   rte;
595         int i;
596
597         if( (rte = (rtable_ent_t *) thing) == NULL ) {
598                 return;
599         }
600
601         rte->refs--;
602         if( rte->refs > 0 ) {                   // something still referencing, so it lives
603                 return;
604         }
605
606         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
607                 for( i = 0; i < rte->nrrgroups; i++ ) {
608                         if( rte->rrgroups[i] ) {
609                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
610                         }
611                 }
612
613                 free( rte->rrgroups );
614         }
615
616         free( rte );                                                                                    // finally, drop the potato
617 }
618
619 /*
620         Read an entire file into a buffer. We assume for route table files
621         they will be smallish and so this won't be a problem.
622         Returns a pointer to the buffer, or nil. Caller must free.
623         Terminates the buffer with a nil character for string processing.
624
625         If we cannot stat the file, we assume it's empty or missing and return
626         an empty buffer, as opposed to a nil, so the caller can generate defaults
627         or error if an empty/missing file isn't tolerated.
628 */
629 static char* uta_fib( char* fname ) {
630         struct stat     stats;
631         off_t           fsize = 8192;   // size of the file
632         off_t           nread;                  // number of bytes read
633         int                     fd;
634         char*           buf;                    // input buffer
635
636         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
637                 if( fstat( fd, &stats ) >= 0 ) {
638                         if( stats.st_size <= 0 ) {                                      // empty file
639                                 close( fd );
640                                 fd = -1;
641                         } else {
642                                 fsize = stats.st_size;                                          // stat ok, save the file size
643                         }
644                 } else {
645                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
646                 }
647         }
648
649         if( fd < 0 ) {                                                                                  // didn't open or empty
650                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
651                         return NULL;
652                 }
653
654                 *buf = 0;
655                 return buf;
656         }
657
658         // add a size limit check here
659
660         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
661                 close( fd );
662                 errno = ENOMEM;
663                 return NULL;
664         }
665
666         nread = read( fd, buf, fsize );
667         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
668                 free( buf );
669                 errno = EFBIG;                                                                                  // likely too much to handle
670                 close( fd );
671                 return NULL;
672         }
673
674         buf[nread] = 0;
675
676         close( fd );
677         return buf;
678 }
679
680 /*
681         Create and initialise a route table; Returns a pointer to the table struct.
682 */
683 static route_table_t* uta_rt_init( ) {
684         route_table_t*  rt;
685
686         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
687                 return NULL;
688         }
689
690         if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) {               // modest size, prime
691                 free( rt );
692                 return NULL;
693         }
694
695         return rt;
696 }
697
698 /*
699         Clone (sort of) an existing route table.  This is done to preserve the endpoint
700         names referenced in a table (and thus existing sessions) when a new set
701         of message type to endpoint name mappings is received.  A new route table
702         with only endpoint name references is returned based on the active table in
703         the context.
704 */
705 static route_table_t* uta_rt_clone( route_table_t* srt ) {
706         endpoint_t*             ep;             // an endpoint
707         route_table_t*  nrt;    // new route table
708         void*   sst;                    // source symtab
709         void*   nst;                    // new symtab
710         thing_list_t things;
711         int i;
712
713         if( srt == NULL ) {
714                 return NULL;
715         }
716
717         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
718                 return NULL;
719         }
720
721         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
722                 free( nrt );
723                 return NULL;
724         }
725
726         things.nalloc = 2048;
727         things.nused = 0;
728         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
729         if( things.things == NULL ) {
730                 free( nrt->hash );
731                 free( nrt );
732                 return NULL;
733         }
734
735         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
736         nst = nrt->hash;
737
738         rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
739
740         for( i = 0; i < things.nused; i++ ) {
741                 ep = (endpoint_t *) things.things[i];
742                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
743         }
744
745         free( things.things );
746         return nrt;
747 }
748
749 /*
750         Clones _all_ of the given route table (references both endpoints AND the route table
751         entries. Needed to support a partial update where some route table entries will not
752         be deleted if not explicitly in the update.
753 */
754 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
755         endpoint_t*             ep;             // an endpoint
756         rtable_ent_t*   rte;    // a route table entry
757         route_table_t*  nrt;    // new route table
758         void*   sst;                    // source symtab
759         void*   nst;                    // new symtab
760         thing_list_t things0;   // things from space 0 (table entries)
761         thing_list_t things1;   // things from space 1 (end points)
762         int i;
763
764         if( srt == NULL ) {
765                 return NULL;
766         }
767
768         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
769                 return NULL;
770         }
771
772         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
773                 free( nrt );
774                 return NULL;
775         }
776
777         things0.nalloc = 2048;
778         things0.nused = 0;
779         things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
780         if( things0.things == NULL ) {
781                 free( nrt->hash );
782                 free( nrt );
783                 return NULL;
784         }
785
786         things1.nalloc = 2048;
787         things1.nused = 0;
788         things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
789         if( things1.things == NULL ) {
790                 free( nrt->hash );
791                 free( nrt );
792                 return NULL;
793         }
794
795         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
796         nst = nrt->hash;
797
798         rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
799         rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
800
801         for( i = 0; i < things0.nused; i++ ) {
802                 rte = (rtable_ent_t *) things0.things[i];
803                 rte->refs++;                                                                                            // rtes can be removed, so we track references
804                 rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
805         }
806
807         for( i = 0; i < things1.nused; i++ ) {
808                 ep = (endpoint_t *) things1.things[i];
809                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
810         }
811
812         free( things0.things );
813         free( things1.things );
814         return nrt;
815 }
816
817 /*
818         Given a name, find the endpoint struct in the provided route table.
819 */
820 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
821
822         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
823                 return NULL;
824         }
825
826         return rmr_sym_get( rt->hash, ep_name, 1 );
827 }
828
829 /*
830         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
831 */
832 static void uta_rt_drop( route_table_t* rt ) {
833         if( rt == NULL ) {
834                 return;
835         }
836
837         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
838         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
839         free( rt );
840 }
841
842 /*
843         Look up and return the pointer to the endpoint stuct matching the given name.
844         If not in the hash, a new endpoint is created, added to the hash. Should always
845         return a pointer.
846 */
847 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
848         endpoint_t*     ep;
849
850         if( !rt || !ep_name || ! *ep_name ) {
851                 fprintf( stderr, "[WRN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
852                 errno = EINVAL;
853                 return NULL;
854         }
855
856         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
857                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
858                         fprintf( stderr, "[WRN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
859                         errno = ENOMEM;
860                         return NULL;
861                 }
862
863                 ep->open = 0;                                                           // not connected
864                 ep->addr = uta_h2ip( ep_name );
865                 ep->name = strdup( ep_name );
866                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
867
868                 rmr_sym_put( rt->hash, ep_name, 1, ep );
869         }
870
871         return ep;
872 }
873
874
875 /*
876         Given a session id and message type build a key that can be used to look up the rte in the route
877         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
878 */
879 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
880         uint64_t key;
881
882         if( sub_id == UNSET_SUBID ) {
883                 key = 0xffffffff00000000 | mtype;
884         } else {
885                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
886         }
887
888         return key;
889 }
890
891
892 #endif