Allow endpoint selection based on meid in message
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58         const char** names;
59 } thing_list_t;
60
61 // ---- debugging/testing -------------------------------------------------------------------------
62
63 /*
64         Dump some stats for an endpoint in the RT. This is generally called to
65         verify endpoints after a table load/change.
66 */
67 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
68         int*    counter;
69         endpoint_t* ep;
70
71         if( (ep = (endpoint_t *) thing) == NULL ) {
72                 return;
73         }
74
75         if( (counter = (int *) vcounter) != NULL ) {
76                 (*counter)++;
77         }
78
79         fprintf( stderr, "[DBUG] RMR rt endpoint: target=%s open=%d\n", ep->name, ep->open );
80 }
81
82 /*
83         Called to count meid entries in the table. The meid points to an 'owning' endpoint
84         so we can list what we find
85 */
86 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
87         int*    counter;
88         endpoint_t* ep;
89
90         if( (ep = (endpoint_t *) thing) == NULL ) {
91                 return;
92         }
93
94         if( (counter = (int *) vcounter) != NULL ) {
95                 (*counter)++;
96         }
97
98         fprintf( stderr, "[DBUG] RMR meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
99 }
100
101 /*
102         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
103         the 'source' information and is added to each message.
104 */
105 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
106         endpoint_t* ep;
107         char*   id;
108
109         if( (ep = (endpoint_t *) thing) == NULL ) {
110                 return;
111         }
112
113         if( (id = (char *) vid) == NULL ) {
114                 id = "missing";
115         }
116
117         fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
118                 (long long) time( NULL ),
119                 id,
120                 ep->name,
121                 ep->open,
122                 ep->scounts[EPSC_GOOD],
123                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
124                 ep->scounts[EPSC_FAIL],
125                 ep->scounts[EPSC_TRANS]   );
126 }
127
128 /*
129         Dump stats for a route entry in the table.
130 */
131 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
132         int*    counter;
133         rtable_ent_t* rte;                      // thing is really an rte
134         int             mtype;
135         int             sid;
136
137         if( (rte = (rtable_ent_t *) thing) == NULL ) {
138                 return;
139         }
140
141         if( (counter = (int *) vcounter) != NULL ) {
142                 (*counter)++;
143         }
144
145         mtype = rte->key & 0xffff;
146         sid = (int) (rte->key >> 32);
147
148         fprintf( stderr, "[DBUG] RMR rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
149 }
150
151 /*
152         Given a route table, cause some stats to be spit out.
153 */
154 static void  rt_stats( route_table_t* rt ) {
155         int* counter;
156
157         if( rt == NULL ) {
158                 fprintf( stderr, "[DBUG] rtstats: nil table\n" );
159                 return;
160         }
161
162         counter = (int *) malloc( sizeof( int ) );
163         *counter = 0;
164         fprintf( stderr, "[DBUG] RMR route table stats:\n" );
165         fprintf( stderr, "[DBUG] RMR route table endpoints:\n" );
166         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
167         fprintf( stderr, "[DBUG] RMR rtable: %d known endpoints\n", *counter );
168
169         fprintf( stderr, "[DBUG] RMR route table entries:\n" );
170         *counter = 0;
171         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
172         fprintf( stderr, "[DBUG] RMR rtable: %d mt entries in table\n", *counter );
173
174         fprintf( stderr, "[DBUG] RMR route table meid map:\n" );
175         *counter = 0;
176         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
177         fprintf( stderr, "[DBUG] RMR rtable: %d meids in map\n", *counter );
178
179         free( counter );
180 }
181
182 /*
183         Given a route table, cause endpoint counters to be written to stderr. The id
184         parm is written as the "source" in the output.
185 */
186 static void  rt_epcounts( route_table_t* rt, char* id ) {
187         if( rt == NULL ) {
188                 fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
189                 return;
190         }
191
192         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
193 }
194
195
196 // ------------------------------------------------------------------------------------------------
197 /*
198         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
199         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
200 */
201 static char* clip( char* buf ) {
202         char*   tok;
203
204         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
205                 buf++;
206         }
207
208         if( (tok = strchr( buf, '#' )) != NULL ) {
209                 if( tok == buf ) {
210                         return buf;                                     // just push back; leading comment sym handled there
211                 }
212
213                 if( isspace( *(tok-1) ) ) {
214                         *tok = 0;
215                 }
216         }
217
218         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
219         *(tok+1) = 0;
220
221         return buf;
222 }
223
224 /*
225         This accepts a pointer to a nil terminated string, and ensures that there is a
226         newline as the last character. If there is not, a new buffer is allocated and
227         the newline is added.  If a new buffer is allocated, the buffer passed in is
228         freed.  The function returns a pointer which the caller should use, and must
229         free.  In the event of an error, a nil pointer is returned.
230 */
231 static char* ensure_nlterm( char* buf ) {
232         char*   nb = NULL;
233         int             len = 1;
234
235         nb = buf;
236         if( buf == NULL || (len = strlen( buf )) < 2 ) {
237                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
238                         *nb = '\n';
239                         *(nb+1) = 0;
240                 }
241         } else {
242                 if( buf[len-1] != '\n' ) {
243                         fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
244                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
245                                 memcpy( nb, buf, len );
246                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
247                                 *(nb+len+1) = 0;
248                         }
249
250                         free( buf );
251                 }
252         }
253
254         return nb;
255 }
256
257 /*
258         Given a message type create a route table entry and add to the hash keyed on the
259         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
260         is the number of group slots to allocate in the entry.
261 */
262 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
263         rtable_ent_t* rte;
264         rtable_ent_t* old_rte;          // entry which was already in the table for the key
265
266         if( rt == NULL ) {
267                 return NULL;
268         }
269
270         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
271                 fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
272                 return NULL;
273         }
274         memset( rte, 0, sizeof( *rte ) );
275         rte->refs = 1;
276         rte->key = key;
277
278         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
279                 nrrgroups = 10;
280         }
281
282         if( nrrgroups ) {
283                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
284                         fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
285                         free( rte );
286                         return NULL;
287                 }
288                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
289         } else {
290                 rte->rrgroups = NULL;
291         }
292
293         rte->nrrgroups = nrrgroups;
294
295         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
296                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
297         }
298
299         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
300
301         if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
302         return rte;
303 }
304
305 /*
306         This accepts partially parsed information from an rte or mse record sent by route manager or read from
307         a file such that:
308                 ts_field is the msg-type,sender field
309                 subid is the integer subscription id
310                 rr_field is the endpoint information for round robening message over
311
312         If all goes well, this will add an RTE to the table under construction.
313
314         The ts_field is checked to see if we should ingest this record. We ingest if one of
315         these is true:
316                 there is no sender info (a generic entry for all)
317                 there is sender and our host:port matches one of the senders
318                 the sender info is an IP address that matches one of our IP addresses
319 */
320 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
321         rtable_ent_t*   rte;            // route table entry added
322         char*   tok;
323         int             ntoks;
324         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
325         char*   tokens[128];
326         char*   gtokens[64];
327         int             i;
328         int             ngtoks;                         // number of tokens in the group list
329         int             grp;                            // index into group list
330
331         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
332         rr_field = clip( rr_field );
333
334         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
335                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
336                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
337
338                 key = build_rt_key( subid, atoi( ts_field ) );
339
340                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
341
342                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
343                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
344                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
345                         }
346                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
347
348                         for( grp = 0; grp < ngtoks; grp++ ) {
349                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
350                                         for( i = 0; i < ntoks; i++ ) {
351                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
352                                                         if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint  ts=%s %s\n", ts_field, tokens[i] );
353                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
354                                                 }
355                                         }
356                                 }
357                         }
358                 }
359         } else {
360                 if( DEBUG || (vlevel > 2) ) {
361                         fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
362                 }
363         }
364 }
365
366 /*
367         Trash_entry takes a partially parsed record from the input and
368         will delete the entry if the sender,mtype matches us or it's a
369         generic mtype. The refernce in the new table is removed and the
370         refcounter for the actual rte is decreased. If that ref count is
371         0 then the memory is freed (handled byh the del_rte call).
372 */
373 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
374         rtable_ent_t*   rte;            // route table entry to be 'deleted'
375         char*   tok;
376         int             ntoks;
377         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
378         char*   tokens[128];
379
380         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
381                 return;
382         }
383
384         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
385
386         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
387                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
388                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
389
390                 key = build_rt_key( subid, atoi( ts_field ) );
391                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
392                 if( rte != NULL ) {
393                         if( DEBUG || (vlevel > 1) ) {
394                                  fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
395                         }
396                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
397                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
398                 } else {
399                         if( DEBUG || (vlevel > 1) ) {
400                                 fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
401                         }
402                 }
403         } else {
404                 if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
405         }
406 }
407
408 /*
409         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
410         the 'owner' which should be the dns name or IP address of an enpoint
411         the meid_list is a space separated list of me IDs
412
413         This function assumes the caller has vetted the pointers as needed.
414
415         For each meid in the list, an entry is pushed into the hash which references the owner
416         endpoint such that when the meid is used to route a message it references the endpoint
417         to send messages to.
418 */
419 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
420         char*   tok;
421         int             ntoks;
422         char*   tokens[128];
423         int             i;
424         int             state;
425         endpoint_t*     ep;                                             // endpoint struct for the owner
426
427         owner = clip( owner );                          // ditch extra whitespace and trailing comments
428         meid_list = clip( meid_list );
429
430         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
431         for( i = 0; i < ntoks; i++ ) {
432                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
433                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
434                         if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
435 fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
436                 } else {
437                         fprintf( stderr, "[WRN] rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
438                 }
439         }
440 }
441
442 /*
443         Given the tokens from an mme_del, delete the listed meid entries from the new
444         table. The list is a space separated list of meids.
445
446         The meids in the hash reference endpoints which are never deleted and so
447         the only thing that we need to do here is to remove the meid from the hash.
448
449         This function assumes the caller has vetted the pointers as needed.
450 */
451 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
452         char*   tok;
453         int             ntoks;
454         char*   tokens[128];
455         int             i;
456
457         if( rtab->hash == NULL ) {
458                 return;
459         }
460
461         meid_list = clip( meid_list );
462
463         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
464         for( i = 0; i < ntoks; i++ ) {
465                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
466                 if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_del: meid deleted: %s\n", tokens[i] );
467         }
468 }
469
470 /*
471         Parse a partially parsed meid record. Tokens[0] should be one of:
472                 meid_map, mme_ar, mme_del.
473 */
474 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
475         if( tokens == NULL || ntoks < 1 ) {
476                 return;                                                 // silent but should never happen
477         }
478
479         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
480                 fprintf( stderr, "[ERR] meid_parse: not enough tokens on %s record\n", tokens[0] );
481                 return;
482         }
483
484         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
485                 tokens[1] = clip( tokens[1] );
486                 if( *(tokens[1]) == 's' ) {
487                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
488                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid map start: dropping incomplete table\n" );
489                                 uta_rt_drop( ctx->new_rtable );
490                         }
491
492                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
493                         ctx->new_rtable->mupdates = 0;
494                         if( DEBUG || (vlevel > 1)  ) fprintf( stderr, "[DBUG] meid_parse: meid map start found\n" );
495                 } else {
496                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
497                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
498                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
499                                                 fprintf( stderr, "[ERR] meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
500                                                 uta_rt_drop( ctx->new_rtable );
501                                                 ctx->new_rtable = NULL;
502                                                 return;
503                                         }
504
505                                         if( DEBUG ) fprintf( stderr, "[DBUG] meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
506                                 }
507
508                                 if( ctx->new_rtable ) {
509                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
510                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
511                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
512                                         ctx->new_rtable = NULL;
513                                         if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of meid map noticed\n" );
514
515                                         if( vlevel > 0 ) {
516                                                 fprintf( stderr, "[DBUG] old route table:\n" );
517                                                 rt_stats( ctx->old_rtable );
518                                                 fprintf( stderr, "[DBUG] new route table:\n" );
519                                                 rt_stats( ctx->rtable );
520                                         }
521                                 } else {
522                                         if( DEBUG ) fprintf( stderr, "[DBUG] end of meid map noticed, but one was not started!\n" );
523                                         ctx->new_rtable = NULL;
524                                 }
525                         }
526                 }
527
528                 return;
529         }       
530
531         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
532                 if( DEBUG ) fprintf( stderr, "[DBUG] meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
533                 return;
534         }
535
536         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
537                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
538                         fprintf( stderr, "[ERR] meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
539                         return;
540                 }
541                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
542                 ctx->new_rtable->mupdates++;
543         }
544
545         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
546                 if( ntoks < 2 ) {
547                         fprintf( stderr, "[ERR] meid_parse: mme_del record didn't have enough tokens\n" );
548                         return;
549                 }
550                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
551                 ctx->new_rtable->mupdates++;
552         }
553 }
554
555 /*
556         Parse a single record recevied from the route table generator, or read
557         from a static route table file.  Start records cause a new table to
558         be started (if a partial table was received it is discarded. Table
559         entry records are added to the currenly 'in progress' table, and an
560         end record causes the in progress table to be finalised and the
561         currently active table is replaced.
562
563         The updated table will be activated when the *|end record is encountered.
564         However, to allow for a "double" update, where both the meid map and the
565         route table must be updated at the same time, the end indication on a
566         route table (new or update) may specifiy "hold" which indicates that meid
567         map entries are to follow and the updated route table should be held as
568         pending until the end of the meid map is received and validated.
569
570         CAUTION:  we are assuming that there is a single route/meid map generator
571                 and as such only one type of update is received at a time; in other
572                 words, the sender cannot mix update records and if there is more than
573                 one sender process they must synchronise to avoid issues.
574
575
576         For a RT update, we expect:
577                 newrt|{start|end [hold]}
578                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
579                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
580                 mse| <mtype>[,sender] | <sub-id> | %meid
581
582
583         For a meid map update we expect:
584                 meid_map | start
585                 meid_map | end | <count> | <md5-hash>
586                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
587                 mme_del | <meid0> <meid1>...<meidn>
588
589 */
590 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
591         int i;
592         int ntoks;                                                      // number of tokens found in something
593         int ngtoks;
594         int     grp;                                                    // group number
595         rtable_ent_t*   rte;                            // route table entry added
596         char*   tokens[128];
597         char*   tok;                                            // pointer into a token or string
598
599         if( ! buf ) {
600                 return;
601         }
602
603         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
604                 buf++;
605         }
606         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
607         *(tok+1) = 0;
608
609         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
610                 tokens[0] = clip( tokens[0] );
611                 switch( *(tokens[0]) ) {
612                         case 0:                                                                                                 // ignore blanks
613                                 // fallthrough
614                         case '#':                                                                                               // and comment lines
615                                 break;
616
617                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
618                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
619                                         break;
620                                 }
621
622                                 if( ntoks < 3 ) {
623                                         if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
624                                         break;
625                                 }
626
627                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
628                                 ctx->new_rtable->updates++;
629                                 break;
630
631                         case 'n':                                                                                               // newrt|{start|end}
632                                 tokens[1] = clip( tokens[1] );
633                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
634                                         if( ctx->new_rtable ) {
635                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
636                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
637                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
638                                                 ctx->new_rtable = NULL;
639                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
640
641                                                 if( vlevel > 0 ) {
642                                                         fprintf( stderr, "[DBUG] old route table:\n" );
643                                                         rt_stats( ctx->old_rtable );
644                                                         fprintf( stderr, "[DBUG] new route table:\n" );
645                                                         rt_stats( ctx->rtable );
646                                                 }
647                                         } else {
648                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
649                                                 ctx->new_rtable = NULL;
650                                         }
651                                 } else {                                                                                        // start a new table.
652                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
653                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
654                                                 uta_rt_drop( ctx->new_rtable );
655                                         }
656
657                                         ctx->new_rtable = NULL;
658                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
659                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
660                                 }
661                                 break;
662
663                         case 'm':                                                               // mse entry or one of the meid_ records
664                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
665                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
666                                                 break;
667                                         }
668
669                                         if( ntoks < 4 ) {
670                                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
671                                                 break;
672                                         }
673
674                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
675                                         ctx->new_rtable->updates++;
676                                 } else {
677                                         meid_parser( ctx, tokens, ntoks, vlevel );
678                                 }
679                                 break;
680
681                         case 'r':                                       // assume rt entry
682                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
683                                         break;
684                                 }
685
686                                 ctx->new_rtable->updates++;
687                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
688                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
689                                 } else {
690                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
691                                 }
692                                 break;
693
694                         case 'u':                                                                                               // update current table, not a total replacement
695                                 tokens[1] = clip( tokens[1] );
696                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
697                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
698                                                 break;
699                                         }
700
701                                         if( ntoks >2 ) {
702                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
703                                                         fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
704                                                                 ctx->new_rtable->updates, tokens[2] );
705                                                         uta_rt_drop( ctx->new_rtable );
706                                                         ctx->new_rtable = NULL;
707                                                         break;
708                                                 }
709                                         }
710
711                                         if( ctx->new_rtable ) {
712                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
713                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
714                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
715                                                 ctx->new_rtable = NULL;
716                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
717
718                                                 if( vlevel > 0 ) {
719                                                         fprintf( stderr, "[DBUG] old route table:\n" );
720                                                         rt_stats( ctx->old_rtable );
721                                                         fprintf( stderr, "[DBUG] updated route table:\n" );
722                                                         rt_stats( ctx->rtable );
723                                                 }
724                                         } else {
725                                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
726                                                 ctx->new_rtable = NULL;
727                                         }
728                                 } else {                                                                                        // start a new table.
729                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
730                                                 if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
731                                                 uta_rt_drop( ctx->new_rtable );
732                                         }
733
734                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
735                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
736                                         if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
737                                 }
738                                 break;
739
740                         default:
741                                 if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
742                                 break;
743                 }
744         }
745 }
746
747 /*
748         This function attempts to open a static route table in order to create a 'seed'
749         table during initialisation.  The environment variable RMR_SEED_RT is expected
750         to contain the necessary path to the file. If missing, or if the file is empty,
751         no route table will be available until one is received from the generator.
752
753         This function is probably most useful for testing situations, or extreme
754         cases where the routes are static.
755 */
756 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
757         int             i;
758         char*   fname;
759         char*   fbuf;                           // buffer with file contents
760         char*   rec;                            // start of the record
761         char*   eor;                            // end of the record
762         int             rcount = 0;                     // record count for debug
763
764         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
765                 return;
766         }
767
768         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
769                 fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
770                 return;
771         }
772
773         if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
774         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
775                 if( *eor == '\r' ) {
776                         *eor = '\n';                                                            // will look like a blank line which is ok
777                 }
778         }
779
780         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
781                 rcount++;
782                 if( (eor = strchr( rec, '\n' )) != NULL ) {
783                         *eor = 0;
784                 } else {
785                         fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
786                         fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
787                         free( fbuf );
788                         return;
789                 }
790
791                 parse_rt_rec( ctx, rec, vlevel );
792         }
793
794         if( DEBUG ) fprintf( stderr, "[DBUG] rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
795         free( fbuf );
796 }
797
798 /*
799         Callback driven for each named thing in a symtab. We collect the pointers to those
800         things for later use (cloning).
801 */
802 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
803         thing_list_t*   tl;
804
805         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
806                 return;
807         }
808
809         if( thing == NULL ) {
810                 return;
811         }
812
813         tl->names[tl->nused] = name;                    // the name/key
814         tl->things[tl->nused++] = thing;                // save a reference to the thing
815 }
816
817 /*
818         Called to delete a route table entry struct. We delete the array of endpoint
819         pointers, but NOT the endpoints referenced as those are referenced from
820         multiple entries.
821
822         Route table entries can be concurrently referenced by multiple symtabs, so
823         the actual delete happens only if decrementing the rte's ref count takes it
824         to 0. Thus, it is safe to call this function across a symtab when cleaning up
825         the symtab, or overlaying an entry.
826
827         This function uses ONLY the pointer to the rte (thing) and ignores the other
828         information that symtab foreach function passes (st, entry, and data) which
829         means that it _can_ safetly be used outside of the foreach setting. If
830         the function is changed to depend on any of these three, then a stand-alone
831         rte_cleanup() function should be added and referenced by this, and refererences
832         to this outside of the foreach world should be changed.
833 */
834 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
835         rtable_ent_t*   rte;
836         int i;
837
838         if( (rte = (rtable_ent_t *) thing) == NULL ) {
839                 return;
840         }
841
842         rte->refs--;
843         if( rte->refs > 0 ) {                   // something still referencing, so it lives
844                 return;
845         }
846
847         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
848                 for( i = 0; i < rte->nrrgroups; i++ ) {
849                         if( rte->rrgroups[i] ) {
850                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
851                         }
852                 }
853
854                 free( rte->rrgroups );
855         }
856
857         free( rte );                                                                                    // finally, drop the potato
858 }
859
860 /*
861         Read an entire file into a buffer. We assume for route table files
862         they will be smallish and so this won't be a problem.
863         Returns a pointer to the buffer, or nil. Caller must free.
864         Terminates the buffer with a nil character for string processing.
865
866         If we cannot stat the file, we assume it's empty or missing and return
867         an empty buffer, as opposed to a nil, so the caller can generate defaults
868         or error if an empty/missing file isn't tolerated.
869 */
870 static char* uta_fib( char* fname ) {
871         struct stat     stats;
872         off_t           fsize = 8192;   // size of the file
873         off_t           nread;                  // number of bytes read
874         int                     fd;
875         char*           buf;                    // input buffer
876
877         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
878                 if( fstat( fd, &stats ) >= 0 ) {
879                         if( stats.st_size <= 0 ) {                                      // empty file
880                                 close( fd );
881                                 fd = -1;
882                         } else {
883                                 fsize = stats.st_size;                                          // stat ok, save the file size
884                         }
885                 } else {
886                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
887                 }
888         }
889
890         if( fd < 0 ) {                                                                                  // didn't open or empty
891                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
892                         return NULL;
893                 }
894
895                 *buf = 0;
896                 return buf;
897         }
898
899         // add a size limit check here
900
901         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
902                 close( fd );
903                 errno = ENOMEM;
904                 return NULL;
905         }
906
907         nread = read( fd, buf, fsize );
908         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
909                 free( buf );
910                 errno = EFBIG;                                                                                  // likely too much to handle
911                 close( fd );
912                 return NULL;
913         }
914
915         buf[nread] = 0;
916
917         close( fd );
918         return buf;
919 }
920
921 /*
922         Create and initialise a route table; Returns a pointer to the table struct.
923 */
924 static route_table_t* uta_rt_init( ) {
925         route_table_t*  rt;
926
927         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
928                 return NULL;
929         }
930
931         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
932                 free( rt );
933                 return NULL;
934         }
935
936         return rt;
937 }
938
939 /*
940         Clones one of the spaces in the given table.
941         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
942         Space is the space in the old table to copy. Space 0 uses an integer key and
943         references rte structs. All other spaces use a string key and reference endpoints.
944 */
945 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
946         endpoint_t*             ep;             // an endpoint
947         rtable_ent_t*   rte;    // a route table entry
948         void*   sst;                    // source symtab
949         void*   nst;                    // new symtab
950         thing_list_t things;    // things from the space to copy
951         int             i;
952         int             free_on_err = 0;
953
954         if( nrt == NULL ) {                             // make a new table if needed
955                 free_on_err = 1;
956                 nrt = uta_rt_init();
957         }
958
959         if( srt == NULL ) {             // source was nil, just give back the new table
960                 return nrt;
961         }
962
963         things.nalloc = 2048;
964         things.nused = 0;
965         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
966         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
967         if( things.things == NULL ) {
968                 if( free_on_err ) {
969                         free( nrt->hash );
970                         free( nrt );
971                         nrt = NULL;
972                 }
973
974                 return nrt;
975         }
976
977         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
978         nst = nrt->hash;
979
980         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
981
982         if( DEBUG ) fprintf( stderr, "[DBUG] clone space cloned %d things in space %d\n",  things.nused, space );
983         for( i = 0; i < things.nused; i++ ) {
984                 if( space ) {                                                                                           // string key, epoint reference
985                         ep = (endpoint_t *) things.things[i];
986                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
987                 } else {
988                         rte = (rtable_ent_t *) things.things[i];
989                         rte->refs++;                                                                                    // rtes can be removed, so we track references
990                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
991                 }
992         }
993
994         free( things.things );
995         free( (void *) things.names );
996         return nrt;
997 }
998
999 /*
1000         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1001         The endpoint and meid entries in the hash must be preserved.
1002 */
1003 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1004         endpoint_t*             ep;                             // an endpoint
1005         rtable_ent_t*   rte;                    // a route table entry
1006         route_table_t*  nrt = NULL;             // new route table
1007         int i;
1008
1009         if( srt == NULL ) {
1010                 return uta_rt_init();           // no source to clone, just return an empty table
1011         }
1012
1013         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1014         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1015
1016         return nrt;
1017 }
1018
1019 /*
1020         Creates a new route table and then clones  _all_ of the given route table (references 
1021         both endpoints AND the route table entries. Needed to support a partial update where 
1022         some route table entries will not be deleted if not explicitly in the update and when 
1023         we are adding/replacing meid references.
1024 */
1025 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1026         endpoint_t*             ep;                             // an endpoint
1027         rtable_ent_t*   rte;                    // a route table entry
1028         route_table_t*  nrt = NULL;             // new route table
1029         int i;
1030
1031         if( srt == NULL ) {
1032                 return uta_rt_init();           // no source to clone, just return an empty table
1033         }
1034
1035         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1036         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1037         rt_clone_space( srt, nrt, RT_ME_SPACE );
1038
1039         return nrt;
1040 }
1041
1042 /*
1043         Given a name, find the endpoint struct in the provided route table.
1044 */
1045 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1046
1047         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1048                 return NULL;
1049         }
1050
1051         return rmr_sym_get( rt->hash, ep_name, 1 );
1052 }
1053
1054 /*
1055         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1056 */
1057 static void uta_rt_drop( route_table_t* rt ) {
1058         if( rt == NULL ) {
1059                 return;
1060         }
1061
1062         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1063         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1064         free( rt );
1065 }
1066
1067 /*
1068         Look up and return the pointer to the endpoint stuct matching the given name.
1069         If not in the hash, a new endpoint is created, added to the hash. Should always
1070         return a pointer.
1071 */
1072 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1073         endpoint_t*     ep;
1074
1075         if( !rt || !ep_name || ! *ep_name ) {
1076                 fprintf( stderr, "[WRN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1077                 errno = EINVAL;
1078                 return NULL;
1079         }
1080
1081         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1082                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1083                         fprintf( stderr, "[WRN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1084                         errno = ENOMEM;
1085                         return NULL;
1086                 }
1087
1088                 ep->open = 0;                                                           // not connected
1089                 ep->addr = uta_h2ip( ep_name );
1090                 ep->name = strdup( ep_name );
1091                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1092                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1093
1094                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1095         }
1096
1097         return ep;
1098 }
1099
1100
1101 /*
1102         Given a session id and message type build a key that can be used to look up the rte in the route
1103         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1104 */
1105 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1106         uint64_t key;
1107
1108         if( sub_id == UNSET_SUBID ) {
1109                 key = 0xffffffff00000000 | mtype;
1110         } else {
1111                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1112         }
1113
1114         return key;
1115 }
1116
1117
1118 #endif