3c73847a8db405064b64ba7b111117d412f55f87
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58         const char** names;
59 } thing_list_t;
60
61 // ---- debugging/testing -------------------------------------------------------------------------
62
63 /*
64         Dump some stats for an endpoint in the RT. This is generally called to
65         verify endpoints after a table load/change.
66 */
67 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
68         int*    counter;
69         endpoint_t* ep;
70
71         if( (ep = (endpoint_t *) thing) == NULL ) {
72                 return;
73         }
74
75         if( (counter = (int *) vcounter) != NULL ) {
76                 (*counter)++;
77         }
78
79         fprintf( stderr, "[DBUG] RMR rt endpoint: target=%s open=%d\n", ep->name, ep->open );
80 }
81
82 /*
83         Called to count meid entries in the table. The meid points to an 'owning' endpoint
84         so we can list what we find
85 */
86 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
87         int*    counter;
88         endpoint_t* ep;
89
90         if( (ep = (endpoint_t *) thing) == NULL ) {
91                 return;
92         }
93
94         if( (counter = (int *) vcounter) != NULL ) {
95                 (*counter)++;
96         }
97
98         fprintf( stderr, "[DBUG] RMR meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
99 }
100
101 /*
102         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
103         the 'source' information and is added to each message.
104 */
105 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
106         endpoint_t* ep;
107         char*   id;
108
109         if( (ep = (endpoint_t *) thing) == NULL ) {
110                 return;
111         }
112
113         if( (id = (char *) vid) == NULL ) {
114                 id = "missing";
115         }
116
117         fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
118                 (long long) time( NULL ),
119                 id,
120                 ep->name,
121                 ep->open,
122                 ep->scounts[EPSC_GOOD],
123                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
124                 ep->scounts[EPSC_FAIL],
125                 ep->scounts[EPSC_TRANS]   );
126 }
127
128 /*
129         Dump stats for a route entry in the table.
130 */
131 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
132         int*    counter;
133         rtable_ent_t* rte;                      // thing is really an rte
134         int             mtype;
135         int             sid;
136
137         if( (rte = (rtable_ent_t *) thing) == NULL ) {
138                 return;
139         }
140
141         if( (counter = (int *) vcounter) != NULL ) {
142                 (*counter)++;
143         }
144
145         mtype = rte->key & 0xffff;
146         sid = (int) (rte->key >> 32);
147
148         fprintf( stderr, "[DBUG] RMR rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
149 }
150
151 /*
152         Given a route table, cause some stats to be spit out.
153 */
154 static void  rt_stats( route_table_t* rt ) {
155         int* counter;
156
157         if( rt == NULL ) {
158                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
159                 return;
160         }
161
162         counter = (int *) malloc( sizeof( int ) );
163         *counter = 0;
164         fprintf( stderr, "[DBUG] RMR route table stats:\n" );
165         fprintf( stderr, "[DBUG] RMR route table endpoints:\n" );
166         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
167         fprintf( stderr, "[DBUG] RMR rtable: %d known endpoints\n", *counter );
168
169         fprintf( stderr, "[DBUG] RMR route table entries:\n" );
170         *counter = 0;
171         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
172         fprintf( stderr, "[DBUG] RMR rtable: %d mt entries in table\n", *counter );
173
174         fprintf( stderr, "[DBUG] RMR route table meid map:\n" );
175         *counter = 0;
176         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
177         fprintf( stderr, "[DBUG] RMR rtable: %d meids in map\n", *counter );
178
179         free( counter );
180 }
181
182 /*
183         Given a route table, cause endpoint counters to be written to stderr. The id
184         parm is written as the "source" in the output.
185 */
186 static void  rt_epcounts( route_table_t* rt, char* id ) {
187         if( rt == NULL ) {
188                 fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
189                 return;
190         }
191
192         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
193 }
194
195
196 // ------------------------------------------------------------------------------------------------
197 /*
198         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
199         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
200 */
201 static char* clip( char* buf ) {
202         char*   tok;
203
204         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
205                 buf++;
206         }
207
208         if( (tok = strchr( buf, '#' )) != NULL ) {
209                 if( tok == buf ) {
210                         return buf;                                     // just push back; leading comment sym handled there
211                 }
212
213                 if( isspace( *(tok-1) ) ) {
214                         *tok = 0;
215                 }
216         }
217
218         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
219         *(tok+1) = 0;
220
221         return buf;
222 }
223
224 /*
225         This accepts a pointer to a nil terminated string, and ensures that there is a
226         newline as the last character. If there is not, a new buffer is allocated and
227         the newline is added.  If a new buffer is allocated, the buffer passed in is
228         freed.  The function returns a pointer which the caller should use, and must
229         free.  In the event of an error, a nil pointer is returned.
230 */
231 static char* ensure_nlterm( char* buf ) {
232         char*   nb = NULL;
233         int             len = 1;
234
235         nb = buf;
236         if( buf == NULL || (len = strlen( buf )) < 2 ) {
237                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
238                         *nb = '\n';
239                         *(nb+1) = 0;
240                 }
241         } else {
242                 if( buf[len-1] != '\n' ) {
243                         fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
244                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
245                                 memcpy( nb, buf, len );
246                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
247                                 *(nb+len+1) = 0;
248                         }
249
250                         free( buf );
251                 }
252         }
253
254         return nb;
255 }
256
257 /*
258         Given a message type create a route table entry and add to the hash keyed on the
259         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
260         is the number of group slots to allocate in the entry.
261 */
262 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
263         rtable_ent_t* rte;
264         rtable_ent_t* old_rte;          // entry which was already in the table for the key
265
266         if( rt == NULL ) {
267                 return NULL;
268         }
269
270         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
271                 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
272                 return NULL;
273         }
274         memset( rte, 0, sizeof( *rte ) );
275         rte->refs = 1;
276         rte->key = key;
277
278         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
279                 nrrgroups = 10;
280         }
281
282         if( nrrgroups ) {
283                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
284                         fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
285                         free( rte );
286                         return NULL;
287                 }
288                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
289         } else {
290                 rte->rrgroups = NULL;
291         }
292
293         rte->nrrgroups = nrrgroups;
294
295         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
296                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
297         }
298
299         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
300
301         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
302         return rte;
303 }
304
305 /*
306         This accepts partially parsed information from an rte or mse record sent by route manager or read from
307         a file such that:
308                 ts_field is the msg-type,sender field
309                 subid is the integer subscription id
310                 rr_field is the endpoint information for round robening message over
311
312         If all goes well, this will add an RTE to the table under construction.
313
314         The ts_field is checked to see if we should ingest this record. We ingest if one of
315         these is true:
316                 there is no sender info (a generic entry for all)
317                 there is sender and our host:port matches one of the senders
318                 the sender info is an IP address that matches one of our IP addresses
319 */
320 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
321         rtable_ent_t*   rte;            // route table entry added
322         char*   tok;
323         int             ntoks;
324         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
325         char*   tokens[128];
326         char*   gtokens[64];
327         int             i;
328         int             ngtoks;                         // number of tokens in the group list
329         int             grp;                            // index into group list
330
331         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
332         rr_field = clip( rr_field );
333
334         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
335                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
336                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
337
338                 key = build_rt_key( subid, atoi( ts_field ) );
339
340                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
341
342                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
343                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
344                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
345                         }
346                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
347                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
348
349                         for( grp = 0; grp < ngtoks; grp++ ) {
350                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
351                                         for( i = 0; i < ntoks; i++ ) {
352                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
353                                                         if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint  ts=%s %s\n", ts_field, tokens[i] );
354                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
355                                                 }
356                                         }
357                                 }
358                         }
359                 }
360         } else {
361                 if( DEBUG || (vlevel > 2) ) {
362                         fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
363                 }
364         }
365 }
366
367 /*
368         Trash_entry takes a partially parsed record from the input and
369         will delete the entry if the sender,mtype matches us or it's a
370         generic mtype. The refernce in the new table is removed and the
371         refcounter for the actual rte is decreased. If that ref count is
372         0 then the memory is freed (handled byh the del_rte call).
373 */
374 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
375         rtable_ent_t*   rte;            // route table entry to be 'deleted'
376         char*   tok;
377         int             ntoks;
378         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
379         char*   tokens[128];
380
381         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
382                 return;
383         }
384
385         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
386
387         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
388                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
389                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
390
391                 key = build_rt_key( subid, atoi( ts_field ) );
392                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
393                 if( rte != NULL ) {
394                         if( DEBUG || (vlevel > 1) ) {
395                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
396                         }
397                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
398                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
399                 } else {
400                         if( DEBUG || (vlevel > 1) ) {
401                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
402                         }
403                 }
404         } else {
405                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
406         }
407 }
408
409 /*
410         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
411         the 'owner' which should be the dns name or IP address of an enpoint
412         the meid_list is a space separated list of me IDs
413
414         This function assumes the caller has vetted the pointers as needed.
415
416         For each meid in the list, an entry is pushed into the hash which references the owner
417         endpoint such that when the meid is used to route a message it references the endpoint
418         to send messages to.
419 */
420 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
421         char*   tok;
422         int             ntoks;
423         char*   tokens[128];
424         int             i;
425         int             state;
426         endpoint_t*     ep;                                             // endpoint struct for the owner
427
428         owner = clip( owner );                          // ditch extra whitespace and trailing comments
429         meid_list = clip( meid_list );
430
431         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
432         for( i = 0; i < ntoks; i++ ) {
433                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
434                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
435                         if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
436 fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
437                 } else {
438                         fprintf( stderr, "[WRN] rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
439                 }
440         }
441 }
442
443 /*
444         Given the tokens from an mme_del, delete the listed meid entries from the new
445         table. The list is a space separated list of meids.
446
447         The meids in the hash reference endpoints which are never deleted and so
448         the only thing that we need to do here is to remove the meid from the hash.
449
450         This function assumes the caller has vetted the pointers as needed.
451 */
452 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
453         char*   tok;
454         int             ntoks;
455         char*   tokens[128];
456         int             i;
457
458         if( rtab->hash == NULL ) {
459                 return;
460         }
461
462         meid_list = clip( meid_list );
463
464         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
465         for( i = 0; i < ntoks; i++ ) {
466                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
467                 if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_del: meid deleted: %s\n", tokens[i] );
468         }
469 }
470
471 /*
472         Parse a partially parsed meid record. Tokens[0] should be one of:
473                 meid_map, mme_ar, mme_del.
474 */
475 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
476         if( tokens == NULL || ntoks < 1 ) {
477                 return;                                                 // silent but should never happen
478         }
479
480         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
481                 fprintf( stderr, "[ERR] meid_parse: not enough tokens on %s record\n", tokens[0] );
482                 return;
483         }
484
485         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
486                 tokens[1] = clip( tokens[1] );
487                 if( *(tokens[1]) == 's' ) {
488                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
489                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid map start: dropping incomplete table\n" );
490                                 uta_rt_drop( ctx->new_rtable );
491                         }
492
493                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
494                         ctx->new_rtable->mupdates = 0;
495                         if( DEBUG || (vlevel > 1)  ) fprintf( stderr, "[DBUG] meid_parse: meid map start found\n" );
496                 } else {
497                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
498                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
499                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
500                                                 fprintf( stderr, "[ERR] meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
501                                                 uta_rt_drop( ctx->new_rtable );
502                                                 ctx->new_rtable = NULL;
503                                                 return;
504                                         }
505
506                                         if( DEBUG ) fprintf( stderr, "[DBUG] meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
507                                 }
508
509                                 if( ctx->new_rtable ) {
510                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
511                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
512                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
513                                         ctx->new_rtable = NULL;
514                                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of meid map noticed\n" );
515
516                                         if( vlevel > 0 ) {
517                                                 fprintf( stderr, "[DBUG] old route table:\n" );
518                                                 rt_stats( ctx->old_rtable );
519                                                 fprintf( stderr, "[DBUG] new route table:\n" );
520                                                 rt_stats( ctx->rtable );
521                                         }
522                                 } else {
523                                         if( DEBUG ) fprintf( stderr, "[DBUG] end of meid map noticed, but one was not started!\n" );
524                                         ctx->new_rtable = NULL;
525                                 }
526                         }
527                 }
528
529                 return;
530         }       
531
532         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
533                 if( DEBUG ) fprintf( stderr, "[DBUG] meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
534                 return;
535         }
536
537         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
538                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
539                         fprintf( stderr, "[ERR] meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
540                         return;
541                 }
542                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
543                 ctx->new_rtable->mupdates++;
544         }
545
546         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
547                 if( ntoks < 2 ) {
548                         fprintf( stderr, "[ERR] meid_parse: mme_del record didn't have enough tokens\n" );
549                         return;
550                 }
551                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
552                 ctx->new_rtable->mupdates++;
553         }
554 }
555
556 /*
557         Parse a single record recevied from the route table generator, or read
558         from a static route table file.  Start records cause a new table to
559         be started (if a partial table was received it is discarded. Table
560         entry records are added to the currenly 'in progress' table, and an
561         end record causes the in progress table to be finalised and the
562         currently active table is replaced.
563
564         The updated table will be activated when the *|end record is encountered.
565         However, to allow for a "double" update, where both the meid map and the
566         route table must be updated at the same time, the end indication on a
567         route table (new or update) may specifiy "hold" which indicates that meid
568         map entries are to follow and the updated route table should be held as
569         pending until the end of the meid map is received and validated.
570
571         CAUTION:  we are assuming that there is a single route/meid map generator
572                 and as such only one type of update is received at a time; in other
573                 words, the sender cannot mix update records and if there is more than
574                 one sender process they must synchronise to avoid issues.
575
576
577         For a RT update, we expect:
578                 newrt|{start|end [hold]}
579                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
580                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
581                 mse| <mtype>[,sender] | <sub-id> | %meid
582
583
584         For a meid map update we expect:
585                 meid_map | start
586                 meid_map | end | <count> | <md5-hash>
587                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
588                 mme_del | <meid0> <meid1>...<meidn>
589
590 */
591 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
592         int i;
593         int ntoks;                                                      // number of tokens found in something
594         int ngtoks;
595         int     grp;                                                    // group number
596         rtable_ent_t*   rte;                            // route table entry added
597         char*   tokens[128];
598         char*   tok;                                            // pointer into a token or string
599
600         if( ! buf ) {
601                 return;
602         }
603
604         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
605                 buf++;
606         }
607         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
608         *(tok+1) = 0;
609
610         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
611                 tokens[0] = clip( tokens[0] );
612                 switch( *(tokens[0]) ) {
613                         case 0:                                                                                                 // ignore blanks
614                                 // fallthrough
615                         case '#':                                                                                               // and comment lines
616                                 break;
617
618                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
619                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
620                                         break;
621                                 }
622
623                                 if( ntoks < 3 ) {
624                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
625                                         break;
626                                 }
627
628                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
629                                 ctx->new_rtable->updates++;
630                                 break;
631
632                         case 'n':                                                                                               // newrt|{start|end}
633                                 tokens[1] = clip( tokens[1] );
634                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
635                                         if( ctx->new_rtable ) {
636                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
637                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
638                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
639                                                 ctx->new_rtable = NULL;
640                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
641
642                                                 if( vlevel > 0 ) {
643                                                         fprintf( stderr, "[DBUG] old route table:\n" );
644                                                         rt_stats( ctx->old_rtable );
645                                                         fprintf( stderr, "[DBUG] new route table:\n" );
646                                                         rt_stats( ctx->rtable );
647                                                 }
648                                         } else {
649                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
650                                                 ctx->new_rtable = NULL;
651                                         }
652                                 } else {                                                                                        // start a new table.
653                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
654                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
655                                                 uta_rt_drop( ctx->new_rtable );
656                                         }
657
658                                         ctx->new_rtable = NULL;
659                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
660                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
661                                 }
662                                 break;
663
664                         case 'm':                                                               // mse entry or one of the meid_ records
665                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
666                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
667                                                 break;
668                                         }
669
670                                         if( ntoks < 4 ) {
671                                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
672                                                 break;
673                                         }
674
675                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
676                                         ctx->new_rtable->updates++;
677                                 } else {
678                                         meid_parser( ctx, tokens, ntoks, vlevel );
679                                 }
680                                 break;
681
682                         case 'r':                                       // assume rt entry
683                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
684                                         break;
685                                 }
686
687                                 ctx->new_rtable->updates++;
688                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
689                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
690                                 } else {
691                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
692                                 }
693                                 break;
694
695                         case 'u':                                                                                               // update current table, not a total replacement
696                                 tokens[1] = clip( tokens[1] );
697                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
698                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
699                                                 break;
700                                         }
701
702                                         if( ntoks >2 ) {
703                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
704                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
705                                                                 ctx->new_rtable->updates, tokens[2] );
706                                                         uta_rt_drop( ctx->new_rtable );
707                                                         ctx->new_rtable = NULL;
708                                                         break;
709                                                 }
710                                         }
711
712                                         if( ctx->new_rtable ) {
713                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
714                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
715                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
716                                                 ctx->new_rtable = NULL;
717                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
718
719                                                 if( vlevel > 0 ) {
720                                                         fprintf( stderr, "[DBUG] old route table:\n" );
721                                                         rt_stats( ctx->old_rtable );
722                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
723                                                         rt_stats( ctx->rtable );
724                                                 }
725                                         } else {
726                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
727                                                 ctx->new_rtable = NULL;
728                                         }
729                                 } else {                                                                                        // start a new table.
730                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
731                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
732                                                 uta_rt_drop( ctx->new_rtable );
733                                         }
734
735                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
736                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
737                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
738                                 }
739                                 break;
740
741                         default:
742                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
743                                 break;
744                 }
745         }
746 }
747
748 /*
749         This function attempts to open a static route table in order to create a 'seed'
750         table during initialisation.  The environment variable RMR_SEED_RT is expected
751         to contain the necessary path to the file. If missing, or if the file is empty,
752         no route table will be available until one is received from the generator.
753
754         This function is probably most useful for testing situations, or extreme
755         cases where the routes are static.
756 */
757 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
758         int             i;
759         char*   fname;
760         char*   fbuf;                           // buffer with file contents
761         char*   rec;                            // start of the record
762         char*   eor;                            // end of the record
763         int             rcount = 0;                     // record count for debug
764
765         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
766                 return;
767         }
768
769         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
770                 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
771                 return;
772         }
773
774         if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
775         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
776                 if( *eor == '\r' ) {
777                         *eor = '\n';                                                            // will look like a blank line which is ok
778                 }
779         }
780
781         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
782                 rcount++;
783                 if( (eor = strchr( rec, '\n' )) != NULL ) {
784                         *eor = 0;
785                 } else {
786                         fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
787                         fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
788                         free( fbuf );
789                         return;
790                 }
791
792                 parse_rt_rec( ctx, rec, vlevel );
793         }
794
795         if( DEBUG ) fprintf( stderr, "[DBUG] rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
796         free( fbuf );
797 }
798
799 /*
800         Callback driven for each named thing in a symtab. We collect the pointers to those
801         things for later use (cloning).
802 */
803 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
804         thing_list_t*   tl;
805
806         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
807                 return;
808         }
809
810         if( thing == NULL ) {
811                 return;
812         }
813
814         tl->names[tl->nused] = name;                    // the name/key
815         tl->things[tl->nused++] = thing;                // save a reference to the thing
816 }
817
818 /*
819         Called to delete a route table entry struct. We delete the array of endpoint
820         pointers, but NOT the endpoints referenced as those are referenced from
821         multiple entries.
822
823         Route table entries can be concurrently referenced by multiple symtabs, so
824         the actual delete happens only if decrementing the rte's ref count takes it
825         to 0. Thus, it is safe to call this function across a symtab when cleaning up
826         the symtab, or overlaying an entry.
827
828         This function uses ONLY the pointer to the rte (thing) and ignores the other
829         information that symtab foreach function passes (st, entry, and data) which
830         means that it _can_ safetly be used outside of the foreach setting. If
831         the function is changed to depend on any of these three, then a stand-alone
832         rte_cleanup() function should be added and referenced by this, and refererences
833         to this outside of the foreach world should be changed.
834 */
835 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
836         rtable_ent_t*   rte;
837         int i;
838
839         if( (rte = (rtable_ent_t *) thing) == NULL ) {
840                 return;
841         }
842
843         rte->refs--;
844         if( rte->refs > 0 ) {                   // something still referencing, so it lives
845                 return;
846         }
847
848         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
849                 for( i = 0; i < rte->nrrgroups; i++ ) {
850                         if( rte->rrgroups[i] ) {
851                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
852                         }
853                 }
854
855                 free( rte->rrgroups );
856         }
857
858         free( rte );                                                                                    // finally, drop the potato
859 }
860
861 /*
862         Read an entire file into a buffer. We assume for route table files
863         they will be smallish and so this won't be a problem.
864         Returns a pointer to the buffer, or nil. Caller must free.
865         Terminates the buffer with a nil character for string processing.
866
867         If we cannot stat the file, we assume it's empty or missing and return
868         an empty buffer, as opposed to a nil, so the caller can generate defaults
869         or error if an empty/missing file isn't tolerated.
870 */
871 static char* uta_fib( char* fname ) {
872         struct stat     stats;
873         off_t           fsize = 8192;   // size of the file
874         off_t           nread;                  // number of bytes read
875         int                     fd;
876         char*           buf;                    // input buffer
877
878         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
879                 if( fstat( fd, &stats ) >= 0 ) {
880                         if( stats.st_size <= 0 ) {                                      // empty file
881                                 close( fd );
882                                 fd = -1;
883                         } else {
884                                 fsize = stats.st_size;                                          // stat ok, save the file size
885                         }
886                 } else {
887                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
888                 }
889         }
890
891         if( fd < 0 ) {                                                                                  // didn't open or empty
892                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
893                         return NULL;
894                 }
895
896                 *buf = 0;
897                 return buf;
898         }
899
900         // add a size limit check here
901
902         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
903                 close( fd );
904                 errno = ENOMEM;
905                 return NULL;
906         }
907
908         nread = read( fd, buf, fsize );
909         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
910                 free( buf );
911                 errno = EFBIG;                                                                                  // likely too much to handle
912                 close( fd );
913                 return NULL;
914         }
915
916         buf[nread] = 0;
917
918         close( fd );
919         return buf;
920 }
921
922 /*
923         Create and initialise a route table; Returns a pointer to the table struct.
924 */
925 static route_table_t* uta_rt_init( ) {
926         route_table_t*  rt;
927
928         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
929                 return NULL;
930         }
931
932         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
933                 free( rt );
934                 return NULL;
935         }
936
937         return rt;
938 }
939
940 /*
941         Clones one of the spaces in the given table.
942         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
943         Space is the space in the old table to copy. Space 0 uses an integer key and
944         references rte structs. All other spaces use a string key and reference endpoints.
945 */
946 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
947         endpoint_t*             ep;             // an endpoint
948         rtable_ent_t*   rte;    // a route table entry
949         void*   sst;                    // source symtab
950         void*   nst;                    // new symtab
951         thing_list_t things;    // things from the space to copy
952         int             i;
953         int             free_on_err = 0;
954
955         if( nrt == NULL ) {                             // make a new table if needed
956                 free_on_err = 1;
957                 nrt = uta_rt_init();
958         }
959
960         if( srt == NULL ) {             // source was nil, just give back the new table
961                 return nrt;
962         }
963
964         things.nalloc = 2048;
965         things.nused = 0;
966         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
967         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
968         if( things.things == NULL ) {
969                 if( free_on_err ) {
970                         free( nrt->hash );
971                         free( nrt );
972                         nrt = NULL;
973                 }
974
975                 return nrt;
976         }
977
978         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
979         nst = nrt->hash;
980
981         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
982
983         if( DEBUG ) fprintf( stderr, "[DBUG] clone space cloned %d things in space %d\n",  things.nused, space );
984         for( i = 0; i < things.nused; i++ ) {
985                 if( space ) {                                                                                           // string key, epoint reference
986                         ep = (endpoint_t *) things.things[i];
987                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
988                 } else {
989                         rte = (rtable_ent_t *) things.things[i];
990                         rte->refs++;                                                                                    // rtes can be removed, so we track references
991                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
992                 }
993         }
994
995         free( things.things );
996         free( (void *) things.names );
997         return nrt;
998 }
999
1000 /*
1001         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1002         The endpoint and meid entries in the hash must be preserved.
1003 */
1004 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1005         endpoint_t*             ep;                             // an endpoint
1006         rtable_ent_t*   rte;                    // a route table entry
1007         route_table_t*  nrt = NULL;             // new route table
1008         int i;
1009
1010         if( srt == NULL ) {
1011                 return uta_rt_init();           // no source to clone, just return an empty table
1012         }
1013
1014         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1015         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1016
1017         return nrt;
1018 }
1019
1020 /*
1021         Creates a new route table and then clones  _all_ of the given route table (references 
1022         both endpoints AND the route table entries. Needed to support a partial update where 
1023         some route table entries will not be deleted if not explicitly in the update and when 
1024         we are adding/replacing meid references.
1025 */
1026 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1027         endpoint_t*             ep;                             // an endpoint
1028         rtable_ent_t*   rte;                    // a route table entry
1029         route_table_t*  nrt = NULL;             // new route table
1030         int i;
1031
1032         if( srt == NULL ) {
1033                 return uta_rt_init();           // no source to clone, just return an empty table
1034         }
1035
1036         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1037         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1038         rt_clone_space( srt, nrt, RT_ME_SPACE );
1039
1040         return nrt;
1041 }
1042
1043 /*
1044         Given a name, find the endpoint struct in the provided route table.
1045 */
1046 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1047
1048         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1049                 return NULL;
1050         }
1051
1052         return rmr_sym_get( rt->hash, ep_name, 1 );
1053 }
1054
1055 /*
1056         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1057 */
1058 static void uta_rt_drop( route_table_t* rt ) {
1059         if( rt == NULL ) {
1060                 return;
1061         }
1062
1063         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1064         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1065         free( rt );
1066 }
1067
1068 /*
1069         Look up and return the pointer to the endpoint stuct matching the given name.
1070         If not in the hash, a new endpoint is created, added to the hash. Should always
1071         return a pointer.
1072 */
1073 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1074         endpoint_t*     ep;
1075
1076         if( !rt || !ep_name || ! *ep_name ) {
1077                 fprintf( stderr, "[WRN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1078                 errno = EINVAL;
1079                 return NULL;
1080         }
1081
1082         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1083                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1084                         fprintf( stderr, "[WRN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1085                         errno = ENOMEM;
1086                         return NULL;
1087                 }
1088
1089                 ep->notify = 1;                                                         // show notification on first connection failure
1090                 ep->open = 0;                                                           // not connected
1091                 ep->addr = uta_h2ip( ep_name );
1092                 ep->name = strdup( ep_name );
1093                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1094                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1095
1096                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1097         }
1098
1099         return ep;
1100 }
1101
1102
1103 /*
1104         Given a session id and message type build a key that can be used to look up the rte in the route
1105         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1106 */
1107 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1108         uint64_t key;
1109
1110         if( sub_id == UNSET_SUBID ) {
1111                 key = 0xffffffff00000000 | mtype;
1112         } else {
1113                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1114         }
1115
1116         return key;
1117 }
1118
1119
1120 #endif