enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58 } thing_list_t;
59
60 // ---- debugging/testing -------------------------------------------------------------------------
61
62 /*
63         Dump stats for an endpoint in the RT.
64 */
65 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
66         int*    counter;
67         endpoint_t* ep;
68
69         if( (ep = (endpoint_t *) thing) == NULL ) {
70                 return;
71         }
72
73         if( (counter = (int *) vcounter) != NULL ) {
74                 (*counter)++;
75         }
76
77         fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
78 }
79
80 /*
81         Dump stats for a route entry in the table.
82 */
83 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
84         int*    counter;
85         rtable_ent_t* rte;                      // thing is really an rte
86         int             mtype;
87         int             sid;
88
89         if( (rte = (rtable_ent_t *) thing) == NULL ) {
90                 return;
91         }
92
93         if( (counter = (int *) vcounter) != NULL ) {
94                 (*counter)++;
95         }
96
97         mtype = rte->key & 0xffff;
98         sid = (int) (rte->key >> 32);
99
100         fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
101 }
102
103 /*
104         Given a route table, cause some stats to be spit out.
105 */
106 static void  rt_stats( route_table_t* rt ) {
107         int* counter;
108
109         if( rt == NULL ) {
110                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
111                 return;
112         }
113
114         counter = (int *) malloc( sizeof( int ) );
115         *counter = 0;
116         fprintf( stderr, "[DBUG] rtstats:\n" );
117         rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter );                // run endpoints in the active table
118         fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
119
120         *counter = 0;
121         rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter );               // run entries
122         fprintf( stderr, "[DBUG] %d entries\n", *counter );
123
124         free( counter );
125 }
126
127
128 // ------------------------------------------------------------------------------------------------
129 /*
130         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
131         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
132 */
133 static char* clip( char* buf ) {
134         char*   tok;
135
136         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
137                 buf++;
138         }
139
140         if( (tok = strchr( buf, '#' )) != NULL ) {
141                 if( tok == buf ) {
142                         return buf;                                     // just push back; leading comment sym handled there
143                 }
144
145                 if( isspace( *(tok-1) ) ) {
146                         *tok = 0;
147                 }
148         }
149
150         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
151         *(tok+1) = 0;
152
153         return buf;
154 }
155
156
157 /*
158         Given a message type create a route table entry and add to the hash keyed on the
159         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
160         is the number of group slots to allocate in the entry.
161 */
162 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
163         rtable_ent_t* rte;
164         rtable_ent_t* old_rte;          // entry which was already in the table for the key
165
166         if( rt == NULL ) {
167                 return NULL;
168         }
169
170         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
171                 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
172                 return NULL;
173         }
174         memset( rte, 0, sizeof( *rte ) );
175         rte->refs = 1;
176         rte->key = key;
177
178         if( nrrgroups <= 0 ) {
179                 nrrgroups = 10;
180         }
181
182         if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
183                 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
184                 free( rte );
185                 return NULL;
186         }
187         memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
188         rte->nrrgroups = nrrgroups;
189
190         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
191                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
192         }
193
194         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
195
196         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
197         return rte;
198 }
199
200 /*
201         This accepts partially parsed information from a record sent by route manager or read from
202         a file such that:
203                 ts_field is the msg-type,sender field
204                 subid is the integer subscription id
205                 rr_field is the endpoint information for round robening message over
206
207         If all goes well, this will add an RTE to the table under construction.
208
209         The ts_field is checked to see if we should ingest this record. We ingest if one of
210         these is true:
211                 there is no sender info (a generic entry for all)
212                 there is sender and our host:port matches one of the senders
213                 the sender info is an IP address that matches one of our IP addresses
214 */
215 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
216         rtable_ent_t*   rte;            // route table entry added
217         char*   tok;
218         int             ntoks;
219         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
220         char*   tokens[128];
221         char*   gtokens[64];
222         int             i;
223         int             ngtoks;                         // number of tokens in the group list
224         int             grp;                            // index into group list
225
226         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
227         rr_field = clip( rr_field );
228
229         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
230                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
231                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
232
233                         key = build_rt_key( subid, atoi( ts_field ) );
234
235                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
236
237                         if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
238                                 rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
239
240                                 for( grp = 0; grp < ngtoks; grp++ ) {
241                                         if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
242                                                 for( i = 0; i < ntoks; i++ ) {
243                                                         if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
244                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
245                                                 }
246                                         }
247                                 }
248                         }
249                 } else {
250                         if( DEBUG || (vlevel > 2) )
251                                 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
252                 }
253 }
254
255 /*
256         Trash_entry takes a partially parsed record from the input and
257         will delete the entry if the sender,mtype matches us or it's a
258         generic mtype. The refernce in the new table is removed and the
259         refcounter for the actual rte is decreased. If that ref count is
260         0 then the memory is freed (handled byh the del_rte call).
261 */
262 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
263         rtable_ent_t*   rte;            // route table entry to be 'deleted'
264         char*   tok;
265         int             ntoks;
266         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
267         char*   tokens[128];
268
269         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
270                 return;
271         }
272
273         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
274
275         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
276                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
277                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
278
279                 key = build_rt_key( subid, atoi( ts_field ) );
280                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
281                 if( rte != NULL ) {
282                         if( DEBUG || (vlevel > 1) ) {
283                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
284                         }
285                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
286                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
287                 } else {
288                         if( DEBUG || (vlevel > 1) ) {
289                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
290                         }
291                 }
292         } else {
293                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
294         }
295 }
296
297 /*
298         Parse a single record recevied from the route table generator, or read
299         from a static route table file.  Start records cause a new table to
300         be started (if a partial table was received it is discarded. Table
301         entry records are added to the currenly 'in progress' table, and an
302         end record causes the in progress table to be finalised and the
303         currently active table is replaced.
304
305         We expect one of several types:
306                 newrt|{start|end}
307                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
308                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
309 */
310 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
311         int i;
312         int ntoks;                                                      // number of tokens found in something
313         int ngtoks;
314         int     grp;                                                    // group number
315         rtable_ent_t*   rte;                            // route table entry added
316         char*   tokens[128];
317         char*   gtokens[64];                            // groups
318         char*   tok;                                            // pointer into a token or string
319
320         if( ! buf ) {
321                 return;
322         }
323
324         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
325                 buf++;
326         }
327         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
328         *(tok+1) = 0;
329
330         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
331                 switch( *(tokens[0]) ) {
332                         case 0:                                                                                                 // ignore blanks
333                                 // fallthrough
334                         case '#':                                                                                               // and comment lines
335                                 break;
336
337                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
338                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
339                                         break;
340                                 }
341
342                                 if( ntoks < 3 ) {
343                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
344                                         break;
345                                 }
346
347                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
348                                 ctx->new_rtable->updates++;
349                                 break;
350
351                         case 'n':                                                                                               // newrt|{start|end}
352                                 tokens[1] = clip( tokens[1] );
353                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
354                                         if( ctx->new_rtable ) {
355                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
356                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
357                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
358                                                 ctx->new_rtable = NULL;
359                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
360
361                                                 if( vlevel > 0 ) {
362                                                         fprintf( stderr, "[DBUG] old route table:\n" );
363                                                         rt_stats( ctx->old_rtable );
364                                                         fprintf( stderr, "[DBUG] new route table:\n" );
365                                                         rt_stats( ctx->rtable );
366                                                 }
367                                         } else {
368                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
369                                                 ctx->new_rtable = NULL;
370                                         }
371                                 } else {                                                                                        // start a new table.
372                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
373                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
374                                                 uta_rt_drop( ctx->new_rtable );
375                                         }
376
377                                         if( ctx->rtable )  {
378                                                 ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
379                                         } else {
380                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
381                                         }
382                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
383                                 }
384                                 break;
385
386                         case 'm':                                       // assume mse entry
387                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
388                                         break;
389                                 }
390
391                                 if( ntoks < 4 ) {
392                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
393                                         break;
394                                 }
395
396                                 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
397                                 ctx->new_rtable->updates++;
398                                 break;
399
400                         case 'r':                                       // assume rt entry
401                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
402                                         break;
403                                 }
404
405                                 ctx->new_rtable->updates++;
406                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
407                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
408                                 } else {
409                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
410                                 }
411                                 break;
412
413                         case 'u':                                                                                               // update current table, not a total replacement
414                                 tokens[1] = clip( tokens[1] );
415                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
416                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
417                                                 break;
418                                         }
419
420                                         if( ntoks >2 ) {
421                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
422                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
423                                                                 ctx->new_rtable->updates, tokens[2] );
424                                                         uta_rt_drop( ctx->new_rtable );
425                                                         ctx->new_rtable = NULL;
426                                                         break;
427                                                 }
428                                         }
429
430                                         if( ctx->new_rtable ) {
431                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
432                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
433                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
434                                                 ctx->new_rtable = NULL;
435                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
436
437                                                 if( vlevel > 0 ) {
438                                                         fprintf( stderr, "[DBUG] old route table:\n" );
439                                                         rt_stats( ctx->old_rtable );
440                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
441                                                         rt_stats( ctx->rtable );
442                                                 }
443                                         } else {
444                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
445                                                 ctx->new_rtable = NULL;
446                                         }
447                                 } else {                                                                                        // start a new table.
448                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
449                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
450                                                 uta_rt_drop( ctx->new_rtable );
451                                         }
452
453                                         if( ctx->rtable )  {
454                                                 ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
455                                         } else {
456                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
457                                         }
458
459                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
460                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
461                                 }
462                                 break;
463
464                         default:
465                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
466                                 break;
467                 }
468         }
469 }
470
471 /*
472         This function attempts to open a static route table in order to create a 'seed'
473         table during initialisation.  The environment variable RMR_SEED_RT is expected
474         to contain the necessary path to the file. If missing, or if the file is empty,
475         no route table will be available until one is received from the generator.
476
477         This function is probably most useful for testing situations, or extreme
478         cases where the routes are static.
479 */
480 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
481         int             i;
482         char*   fname;
483         char*   fbuf;                           // buffer with file contents
484         char*   rec;                            // start of the record
485         char*   eor;                            // end of the record
486         int             rcount = 0;                     // record count for debug
487
488         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
489                 return;
490         }
491
492         if( (fbuf = uta_fib( fname ) ) == NULL ) {                      // read file into a single buffer
493                 fprintf( stderr, "[WRN] seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
494                 return;
495         }
496
497         if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully opened: %s\n", fname );
498         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
499                 rcount++;
500                 if( (eor = strchr( rec, '\n' )) != NULL ) {
501                         *eor = 0;
502                 }
503
504                 parse_rt_rec( ctx, rec, vlevel );
505         }
506
507         if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully parsed: %d records\n", rcount );
508         free( fbuf );
509 }
510
511 /*
512         Callback driven for each named thing in a symtab. We collect the pointers to those
513         things for later use (cloning).
514 */
515 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
516         thing_list_t*   tl;
517
518         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
519                 return;
520         }
521
522         if( thing == NULL ) {
523                 return;
524         }
525
526         tl->things[tl->nused++] = thing;                // save a reference to the thing
527 }
528
529 /*
530         Called to delete a route table entry struct. We delete the array of endpoint
531         pointers, but NOT the endpoints referenced as those are referenced from
532         multiple entries.
533
534         Route table entries can be concurrently referenced by multiple symtabs, so
535         the actual delete happens only if decrementing the rte's ref count takes it
536         to 0. Thus, it is safe to call this function across a symtab when cleaning up
537         the symtab, or overlaying an entry.
538
539         This function uses ONLY the pointer to the rte (thing) and ignores the other
540         information that symtab foreach function passes (st, entry, and data) which
541         means that it _can_ safetly be used outside of the foreach setting. If
542         the function is changed to depend on any of these three, then a stand-alone
543         rte_cleanup() function should be added and referenced by this, and refererences
544         to this outside of the foreach world should be changed.
545 */
546 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
547         rtable_ent_t*   rte;
548         int i;
549
550         if( (rte = (rtable_ent_t *) thing) == NULL ) {
551                 return;
552         }
553
554         rte->refs--;
555         if( rte->refs > 0 ) {                   // something still referencing, so it lives
556                 return;
557         }
558
559         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
560                 for( i = 0; i < rte->nrrgroups; i++ ) {
561                         if( rte->rrgroups[i] ) {
562                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
563                         }
564                 }
565
566                 free( rte->rrgroups );
567         }
568
569         free( rte );                                                                                    // finally, drop the potato
570 }
571
572 /*
573         Read an entire file into a buffer. We assume for route table files
574         they will be smallish and so this won't be a problem.
575         Returns a pointer to the buffer, or nil. Caller must free.
576         Terminates the buffer with a nil character for string processing.
577
578         If we cannot stat the file, we assume it's empty or missing and return
579         an empty buffer, as opposed to a nil, so the caller can generate defaults
580         or error if an empty/missing file isn't tolerated.
581 */
582 static char* uta_fib( char* fname ) {
583         struct stat     stats;
584         off_t           fsize = 8192;   // size of the file
585         off_t           nread;                  // number of bytes read
586         int                     fd;
587         char*           buf;                    // input buffer
588
589         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
590                 if( fstat( fd, &stats ) >= 0 ) {
591                         if( stats.st_size <= 0 ) {                                      // empty file
592                                 close( fd );
593                                 fd = -1;
594                         } else {
595                                 fsize = stats.st_size;                                          // stat ok, save the file size
596                         }
597                 } else {
598                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
599                 }
600         }
601
602         if( fd < 0 ) {                                                                                  // didn't open or empty
603                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
604                         return NULL;
605                 }
606
607                 *buf = 0;
608                 return buf;
609         }
610
611         // add a size limit check here
612
613         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
614                 close( fd );
615                 errno = ENOMEM;
616                 return NULL;
617         }
618
619         nread = read( fd, buf, fsize );
620         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
621                 free( buf );
622                 errno = EFBIG;                                                                                  // likely too much to handle
623                 close( fd );
624                 return NULL;
625         }
626
627         buf[nread] = 0;
628
629         close( fd );
630         return buf;
631 }
632
633 /*
634         Create and initialise a route table; Returns a pointer to the table struct.
635 */
636 static route_table_t* uta_rt_init( ) {
637         route_table_t*  rt;
638
639         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
640                 return NULL;
641         }
642
643         if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) {               // modest size, prime
644                 free( rt );
645                 return NULL;
646         }
647
648         return rt;
649 }
650
651 /*
652         Clone (sort of) an existing route table.  This is done to preserve the endpoint
653         names referenced in a table (and thus existing sessions) when a new set
654         of message type to endpoint name mappings is received.  A new route table
655         with only endpoint name references is returned based on the active table in
656         the context.
657 */
658 static route_table_t* uta_rt_clone( route_table_t* srt ) {
659         endpoint_t*             ep;             // an endpoint
660         route_table_t*  nrt;    // new route table
661         void*   sst;                    // source symtab
662         void*   nst;                    // new symtab
663         thing_list_t things;
664         int i;
665
666         if( srt == NULL ) {
667                 return NULL;
668         }
669
670         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
671                 return NULL;
672         }
673
674         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
675                 free( nrt );
676                 return NULL;
677         }
678
679         things.nalloc = 2048;
680         things.nused = 0;
681         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
682         if( things.things == NULL ) {
683                 free( nrt->hash );
684                 free( nrt );
685                 return NULL;
686         }
687
688         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
689         nst = nrt->hash;
690
691         rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
692
693         for( i = 0; i < things.nused; i++ ) {
694                 ep = (endpoint_t *) things.things[i];
695                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
696         }
697
698         free( things.things );
699         return nrt;
700 }
701
702 /*
703         Clones _all_ of the given route table (references both endpoints AND the route table
704         entries. Needed to support a partial update where some route table entries will not
705         be deleted if not explicitly in the update.
706 */
707 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
708         endpoint_t*             ep;             // an endpoint
709         rtable_ent_t*   rte;    // a route table entry
710         route_table_t*  nrt;    // new route table
711         void*   sst;                    // source symtab
712         void*   nst;                    // new symtab
713         thing_list_t things0;   // things from space 0 (table entries)
714         thing_list_t things1;   // things from space 1 (end points)
715         int i;
716
717         if( srt == NULL ) {
718                 return NULL;
719         }
720
721         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
722                 return NULL;
723         }
724
725         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
726                 free( nrt );
727                 return NULL;
728         }
729
730         things0.nalloc = 2048;
731         things0.nused = 0;
732         things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
733         if( things0.things == NULL ) {
734                 free( nrt->hash );
735                 free( nrt );
736                 return NULL;
737         }
738
739         things1.nalloc = 2048;
740         things1.nused = 0;
741         things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
742         if( things1.things == NULL ) {
743                 free( nrt->hash );
744                 free( nrt );
745                 return NULL;
746         }
747
748         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
749         nst = nrt->hash;
750
751         rmr_sym_foreach_class( sst, 0, collect_things, &things0 );              // collect the rtes
752         rmr_sym_foreach_class( sst, 1, collect_things, &things1 );              // collect the named endpoints in the active table
753
754         for( i = 0; i < things0.nused; i++ ) {
755                 rte = (rtable_ent_t *) things0.things[i];
756                 rte->refs++;                                                                                            // rtes can be removed, so we track references
757                 rmr_sym_map( nst, rte->key, rte );                                                      // add to hash using numeric mtype/sub-id as key (default to space 0)
758         }
759
760         for( i = 0; i < things1.nused; i++ ) {
761                 ep = (endpoint_t *) things1.things[i];
762                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
763         }
764
765         free( things0.things );
766         free( things1.things );
767         return nrt;
768 }
769
770 /*
771         Given a name, find the endpoint struct in the provided route table.
772 */
773 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
774
775         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
776                 return NULL;
777         }
778
779         return rmr_sym_get( rt->hash, ep_name, 1 );
780 }
781
782 /*
783         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
784 */
785 static void uta_rt_drop( route_table_t* rt ) {
786         if( rt == NULL ) {
787                 return;
788         }
789
790         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
791         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
792         free( rt );
793 }
794
795 /*
796         Look up and return the pointer to the endpoint stuct matching the given name.
797         If not in the hash, a new endpoint is created, added to the hash. Should always
798         return a pointer.
799 */
800 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
801         endpoint_t*     ep;
802
803         if( !rt || !ep_name || ! *ep_name ) {
804                 fprintf( stderr, "[WARN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
805                 errno = EINVAL;
806                 return NULL;
807         }
808
809         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
810                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
811                         fprintf( stderr, "[WARN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
812                         errno = ENOMEM;
813                         return NULL;
814                 }
815
816                 ep->open = 0;                                                           // not connected
817                 ep->addr = uta_h2ip( ep_name );
818                 ep->name = strdup( ep_name );
819                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
820
821                 rmr_sym_put( rt->hash, ep_name, 1, ep );
822         }
823
824         return ep;
825 }
826
827
828 /*
829         Given a session id and message type build a key that can be used to look up the rte in the route
830         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
831 */
832 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
833         uint64_t key;
834
835         if( sub_id == UNSET_SUBID ) {
836                 key = 0xffffffff00000000 | mtype;
837         } else {
838                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
839         }
840
841         return key;
842 }
843
844
845 #endif