feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / src / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58 } thing_list_t;
59
60 // ------------------------------------------------------------------------------------------------
61
62
63 /*
64         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
65         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
66 */
67 static char* clip( char* buf ) {
68         char*   tok;
69
70         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
71                 buf++;
72         }
73
74         if( (tok = strchr( buf, '#' )) != NULL ) {
75                 if( tok == buf ) {
76                         return buf;                                     // just push back; leading comment sym handled there
77                 }
78
79                 if( isspace( *(tok-1) ) ) {
80                         *tok = 0;
81                 }
82         }
83
84         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
85         *(tok+1) = 0;
86
87         return buf;
88 }
89
90
91 /*
92         Given a message type create a route table entry and add to the hash keyed on the
93         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
94         is the number of group slots to allocate in the entry.
95 */
96 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
97         rtable_ent_t* rte;
98
99         if( rt == NULL ) {
100                 return NULL;
101         }
102
103         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
104                 fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
105                 return NULL;
106         }
107         memset( rte, 0, sizeof( *rte ) );
108
109         if( nrrgroups <= 0 ) {
110                 nrrgroups = 10;
111         }
112
113         if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
114                 fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
115                 free( rte );
116                 return NULL;
117         }
118         memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
119         rte->nrrgroups = nrrgroups;
120
121         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
122
123         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
124         return rte;
125 }
126
127 /*
128         This accepts partially parsed information from a record sent by route manager or read from
129         a file such that:
130                 ts_field is the msg-type,sender field
131                 subid is the integer subscription id
132                 rr_field is the endpoint information for round robening message over
133
134         If all goes well, this will add an RTE to the table under construction.
135
136         The ts_field is checked to see if we should ingest this record. We ingest if one of
137         these is true:
138                 there is no sender info (a generic entry for all)
139                 there is sender and our host:port matches one of the senders
140                 the sender info is an IP address that matches one of our IP addresses
141 */
142 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
143         rtable_ent_t*   rte;            // route table entry added
144         char*   tok;
145         int             ntoks;
146         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
147         char*   tokens[128];
148         char*   gtokens[64];
149         int             i;
150         int             ngtoks;                         // number of tokens in the group list
151         int             grp;                            // index into group list
152
153         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
154         rr_field = clip( rr_field );
155
156         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
157                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
158                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
159
160                         key = build_rt_key( subid, atoi( ts_field ) );
161
162                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
163
164                         if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
165                                 rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
166
167                                 for( grp = 0; grp < ngtoks; grp++ ) {
168                                         if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
169                                                 for( i = 0; i < ntoks; i++ ) {
170                                                         if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG]    add endpoint  %s\n", ts_field );
171                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
172                                                 }
173                                         }
174                                 }
175                         }
176                 } else {
177                         if( DEBUG || (vlevel > 2) )
178                                 fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
179                 }
180 }
181
182 /*
183         Parse a single record recevied from the route table generator, or read
184         from a static route table file.  Start records cause a new table to
185         be started (if a partial table was received it is discarded. Table
186         entry records are added to the currenly 'in progress' table, and an
187         end record causes the in progress table to be finalised and the
188         currently active table is replaced.
189
190         We expect one of several types:
191                 newrt|{start|end}
192                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
193                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
194 */
195 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
196         int i;
197         int ntoks;                                                      // number of tokens found in something
198         int ngtoks;
199         int     grp;                                                    // group number
200         rtable_ent_t*   rte;                            // route table entry added
201         char*   tokens[128];
202         char*   gtokens[64];                            // groups
203         char*   tok;                                            // pointer into a token or string
204
205         if( ! buf ) {
206                 return;
207         }
208
209         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
210                 buf++;
211         }
212         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
213         *(tok+1) = 0;
214
215         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
216                 switch( *(tokens[0]) ) {
217                         case 0:                                                                                                 // ignore blanks
218                                 // fallthrough
219                         case '#':                                                                                               // and comment lines
220                                 break;
221
222                         case 'n':                                                                                               // newrt|{start|end}
223                                 tokens[1] = clip( tokens[1] );
224                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
225                                         if( ctx->new_rtable ) {
226                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
227                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
228                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
229                                                 ctx->new_rtable = NULL;
230                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
231                                         } else {
232                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
233                                                 ctx->new_rtable = NULL;
234                                         }
235                                 } else {                                                                                        // start a new table.
236                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
237                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
238                                                 uta_rt_drop( ctx->new_rtable );
239                                         }
240
241                                         if( ctx->rtable )  {
242                                                 ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint entries from active table
243                                         } else {
244                                                 ctx->new_rtable = uta_rt_init(  );                              // don't have one yet, just crate empty
245                                         }
246                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
247                                 }
248                                 break;
249
250                         case 'm':                                       // assume mse entry
251                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
252                                         break;
253                                 }
254
255                                 if( ntoks < 4 ) {
256                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
257                                         break;
258                                 }
259
260                                 build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
261                                 break;
262
263                         case 'r':                                       // assume rt entry
264                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
265                                         break;
266                                 }
267
268                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
269                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
270                                 } else {
271                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
272                                 }
273                                 break;
274
275                         default:
276                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
277                                 break;
278                 }
279         }
280 }
281
282 /*
283         This function attempts to open a static route table in order to create a 'seed'
284         table during initialisation.  The environment variable RMR_SEED_RT is expected
285         to contain the necessary path to the file. If missing, or if the file is empty,
286         no route table will be available until one is received from the generator.
287
288         This function is probably most useful for testing situations, or extreme
289         cases where the routes are static.
290 */
291 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
292         int             i;
293         char*   fname;
294         char*   fbuf;                           // buffer with file contents
295         char*   rec;                            // start of the record
296         char*   eor;                            // end of the record
297         int             rcount = 0;                     // record count for debug
298
299         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
300                 return;
301         }
302
303         if( (fbuf = uta_fib( fname ) ) == NULL ) {                      // read file into a single buffer
304                 fprintf( stderr, "[WRN] seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
305                 return;
306         }
307
308         if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully opened: %s\n", fname );
309         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
310                 rcount++;
311                 if( (eor = strchr( rec, '\n' )) != NULL ) {
312                         *eor = 0;
313                 }
314
315                 parse_rt_rec( ctx, rec, vlevel );
316         }
317
318         if( DEBUG ) fprintf( stderr, "[INFO] seed route table successfully parsed: %d records\n", rcount );
319         free( fbuf );
320 }
321
322 /*
323         Callback driven for each named thing in a symtab. We collect the pointers to those
324         things for later use (cloning).
325 */
326 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
327         thing_list_t*   tl;
328
329         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
330                 return;
331         }
332
333         if( thing == NULL ) {
334                 return;
335         }
336
337         tl->things[tl->nused++] = thing;                // save a reference to the thing
338 }
339
340 /*
341         Called to delete a route table entry struct. We delete the array of endpoint
342         pointers, but NOT the endpoints referenced as those are referenced from
343         multiple entries.
344 */
345 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
346         rtable_ent_t*   rte;
347         int i;
348
349         if( (rte = (rtable_ent_t *) thing) == NULL ) {
350                 return;
351         }
352
353         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
354                 for( i = 0; i < rte->nrrgroups; i++ ) {
355                         if( rte->rrgroups[i] ) {
356                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
357                         }
358                 }
359
360                 free( rte->rrgroups );
361         }
362
363         free( rte );                                                                                    // finally, drop the potato
364 }
365
366 /*
367         Read an entire file into a buffer. We assume for route table files
368         they will be smallish and so this won't be a problem.
369         Returns a pointer to the buffer, or nil. Caller must free.
370         Terminates the buffer with a nil character for string processing.
371
372         If we cannot stat the file, we assume it's empty or missing and return
373         an empty buffer, as opposed to a nil, so the caller can generate defaults
374         or error if an empty/missing file isn't tolerated.
375 */
376 static char* uta_fib( char* fname ) {
377         struct stat     stats;
378         off_t           fsize = 8192;   // size of the file
379         off_t           nread;                  // number of bytes read
380         int                     fd;
381         char*           buf;                    // input buffer
382
383         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
384                 if( fstat( fd, &stats ) >= 0 ) {
385                         if( stats.st_size <= 0 ) {                                      // empty file
386                                 close( fd );
387                                 fd = -1;
388                         } else {
389                                 fsize = stats.st_size;                                          // stat ok, save the file size
390                         }
391                 } else {
392                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
393                 }
394         }
395
396         if( fd < 0 ) {                                                                                  // didn't open or empty
397                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
398                         return NULL;
399                 }
400
401                 *buf = 0;
402                 return buf;
403         }
404
405         // add a size limit check here
406
407         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
408                 close( fd );
409                 errno = ENOMEM;
410                 return NULL;
411         }
412
413         nread = read( fd, buf, fsize );
414         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
415                 free( buf );
416                 errno = EFBIG;                                                                                  // likely too much to handle
417                 close( fd );
418                 return NULL;
419         }
420
421         buf[nread] = 0;
422
423         close( fd );
424         return buf;
425 }
426
427 /*
428         Create and initialise a route table; Returns a pointer to the table struct.
429 */
430 static route_table_t* uta_rt_init( ) {
431         route_table_t*  rt;
432
433         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
434                 return NULL;
435         }
436
437         if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) {               // modest size, prime
438                 free( rt );
439                 return NULL;
440         }
441
442         return rt;
443 }
444
445 /*
446         Clone (sort of) an existing route table.  This is done to preserve the endpoint
447         names referenced in a table (and thus existing sessions) when a new set
448         of message type to endpoint name mappings is received.  A new route table
449         with only endpoint name references is returned based on the active table in
450         the context.
451 */
452 static route_table_t* uta_rt_clone( route_table_t* srt ) {
453         endpoint_t*             ep;             // an endpoint
454         route_table_t*  nrt;    // new route table
455         route_table_t*  art;    // active route table
456         void*   sst;                    // source symtab
457         void*   nst;                    // new symtab
458         thing_list_t things;
459         int i;
460
461         if( srt == NULL ) {
462                 return NULL;
463         }
464
465         if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
466                 return NULL;
467         }
468
469         if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) {              // modest size, prime
470                 free( nrt );
471                 return NULL;
472         }
473
474         things.nalloc = 2048;
475         things.nused = 0;
476         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
477         if( things.things == NULL ) {
478                 free( nrt->hash );
479                 free( nrt );
480                 return NULL;
481         }
482
483         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
484         nst = nrt->hash;
485
486         rmr_sym_foreach_class( sst, 1, collect_things, &things );               // collect the named endpoints in the active table
487
488         for( i = 0; i < things.nused; i++ ) {
489                 ep = (endpoint_t *) things.things[i];
490                 rmr_sym_put( nst, ep->name, 1, ep );                                            // slam this one into the new table
491         }
492
493         free( things.things );
494         return nrt;
495 }
496
497 /*
498         Given a name, find the endpoint struct in the provided route table.
499 */
500 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
501
502         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
503                 return NULL;
504         }
505
506         return rmr_sym_get( rt->hash, ep_name, 1 );
507 }
508
509 /*
510         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
511 */
512 static void uta_rt_drop( route_table_t* rt ) {
513         if( rt == NULL ) {
514                 return;
515         }
516
517         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
518         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
519         free( rt );
520 }
521
522 /*
523         Look up and return the pointer to the endpoint stuct matching the given name.
524         If not in the hash, a new endpoint is created, added to the hash. Should always
525         return a pointer.
526 */
527 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
528         endpoint_t*     ep;
529
530         if( !rt || !ep_name || ! *ep_name ) {
531                 fprintf( stderr, "[WARN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
532                 errno = EINVAL;
533                 return NULL;
534         }
535
536         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
537                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
538                         fprintf( stderr, "[WARN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
539                         errno = ENOMEM;
540                         return NULL;
541                 }
542
543                 ep->open = 0;                                   // not connected
544                 ep->addr = uta_h2ip( ep_name );
545                 ep->name = strdup( ep_name );
546
547                 rmr_sym_put( rt->hash, ep_name, 1, ep );
548         }
549
550         return ep;
551 }
552
553
554 /*
555         Given a session id and message type build a key that can be used to look up the rte in the route
556         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
557 */
558 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
559         uint64_t key;
560
561         if( sub_id == UNSET_SUBID ) {
562                 key = 0xffffffff00000000 | mtype;
563         } else {
564                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
565         }
566
567         return key;
568 }
569
570
571 #endif