12edecd53de4e07de83c52056140f0bb1f3e731b
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49
50 /*
51         Passed to a symtab foreach callback to construct a list of pointers from
52         a current symtab.
53 */
54 typedef struct thing_list {
55         int nalloc;
56         int nused;
57         void** things;
58         const char** names;
59 } thing_list_t;
60
61 // ---- debugging/testing -------------------------------------------------------------------------
62
63 /*
64         Dump some stats for an endpoint in the RT. This is generally called to
65         verify endpoints after a table load/change.
66 */
67 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
68         int*    counter;
69         endpoint_t* ep;
70
71         if( (ep = (endpoint_t *) thing) == NULL ) {
72                 return;
73         }
74
75         if( (counter = (int *) vcounter) != NULL ) {
76                 (*counter)++;
77         }
78
79         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
80 }
81
82 /*
83         Called to count meid entries in the table. The meid points to an 'owning' endpoint
84         so we can list what we find
85 */
86 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
87         int*    counter;
88         endpoint_t* ep;
89
90         if( (ep = (endpoint_t *) thing) == NULL ) {
91                 return;
92         }
93
94         if( (counter = (int *) vcounter) != NULL ) {
95                 (*counter)++;
96         }
97
98         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
99 }
100
101 /*
102         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
103         the 'source' information and is added to each message.
104 */
105 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
106         endpoint_t* ep;
107         char*   id;
108
109         if( (ep = (endpoint_t *) thing) == NULL ) {
110                 return;
111         }
112
113         if( (id = (char *) vid) == NULL ) {
114                 id = "missing";
115         }
116
117         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
118                 (long long) time( NULL ),
119                 id,
120                 ep->name,
121                 ep->open,
122                 ep->scounts[EPSC_GOOD],
123                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
124                 ep->scounts[EPSC_FAIL],
125                 ep->scounts[EPSC_TRANS]   );
126 }
127
128 /*
129         Dump stats for a route entry in the table.
130 */
131 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
132         int*    counter;
133         rtable_ent_t* rte;                      // thing is really an rte
134         int             mtype;
135         int             sid;
136
137         if( (rte = (rtable_ent_t *) thing) == NULL ) {
138                 return;
139         }
140
141         if( (counter = (int *) vcounter) != NULL ) {
142                 (*counter)++;
143         }
144
145         mtype = rte->key & 0xffff;
146         sid = (int) (rte->key >> 32);
147
148         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
149 }
150
151 /*
152         Given a route table, cause some stats to be spit out.
153 */
154 static void  rt_stats( route_table_t* rt ) {
155         int* counter;
156
157         if( rt == NULL ) {
158                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
159                 return;
160         }
161
162         counter = (int *) malloc( sizeof( int ) );
163         *counter = 0;
164         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
165         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
166         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
167         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
168
169         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
170         *counter = 0;
171         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
172         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
173
174         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
175         *counter = 0;
176         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
177         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
178
179         free( counter );
180 }
181
182 /*
183         Given a route table, cause endpoint counters to be written to stderr. The id
184         parm is written as the "source" in the output.
185 */
186 static void  rt_epcounts( route_table_t* rt, char* id ) {
187         if( rt == NULL ) {
188                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
189                 return;
190         }
191
192         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
193 }
194
195
196 // ------------------------------------------------------------------------------------------------
197 /*
198         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
199         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
200 */
201 static char* clip( char* buf ) {
202         char*   tok;
203
204         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
205                 buf++;
206         }
207
208         if( (tok = strchr( buf, '#' )) != NULL ) {
209                 if( tok == buf ) {
210                         return buf;                                     // just push back; leading comment sym handled there
211                 }
212
213                 if( isspace( *(tok-1) ) ) {
214                         *tok = 0;
215                 }
216         }
217
218         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
219         *(tok+1) = 0;
220
221         return buf;
222 }
223
224 /*
225         This accepts a pointer to a nil terminated string, and ensures that there is a
226         newline as the last character. If there is not, a new buffer is allocated and
227         the newline is added.  If a new buffer is allocated, the buffer passed in is
228         freed.  The function returns a pointer which the caller should use, and must
229         free.  In the event of an error, a nil pointer is returned.
230 */
231 static char* ensure_nlterm( char* buf ) {
232         char*   nb = NULL;
233         int             len = 1;
234
235         nb = buf;
236         if( buf == NULL || (len = strlen( buf )) < 2 ) {
237                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
238                         *nb = '\n';
239                         *(nb+1) = 0;
240                 }
241         } else {
242                 if( buf[len-1] != '\n' ) {
243                         rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
244                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
245                                 memcpy( nb, buf, len );
246                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
247                                 *(nb+len+1) = 0;
248                         }
249
250                         free( buf );
251                 }
252         }
253
254         return nb;
255 }
256
257 /*
258         Given a message type create a route table entry and add to the hash keyed on the
259         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
260         is the number of group slots to allocate in the entry.
261 */
262 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
263         rtable_ent_t* rte;
264         rtable_ent_t* old_rte;          // entry which was already in the table for the key
265
266         if( rt == NULL ) {
267                 return NULL;
268         }
269
270         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
271                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
272                 return NULL;
273         }
274         memset( rte, 0, sizeof( *rte ) );
275         rte->refs = 1;
276         rte->key = key;
277
278         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
279                 nrrgroups = 10;
280         }
281
282         if( nrrgroups ) {
283                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
284                         free( rte );
285                         return NULL;
286                 }
287                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
288         } else {
289                 rte->rrgroups = NULL;
290         }
291
292         rte->nrrgroups = nrrgroups;
293
294         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
295                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
296         }
297
298         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
299
300         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
301         return rte;
302 }
303
304 /*
305         This accepts partially parsed information from an rte or mse record sent by route manager or read from
306         a file such that:
307                 ts_field is the msg-type,sender field
308                 subid is the integer subscription id
309                 rr_field is the endpoint information for round robening message over
310
311         If all goes well, this will add an RTE to the table under construction.
312
313         The ts_field is checked to see if we should ingest this record. We ingest if one of
314         these is true:
315                 there is no sender info (a generic entry for all)
316                 there is sender and our host:port matches one of the senders
317                 the sender info is an IP address that matches one of our IP addresses
318 */
319 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
320         rtable_ent_t*   rte;            // route table entry added
321         char*   tok;
322         int             ntoks;
323         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
324         char*   tokens[128];
325         char*   gtokens[64];
326         int             i;
327         int             ngtoks;                         // number of tokens in the group list
328         int             grp;                            // index into group list
329
330         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
331         rr_field = clip( rr_field );
332
333         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
334                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
335                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
336
337                 key = build_rt_key( subid, atoi( ts_field ) );
338
339                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
340
341                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
342                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
343                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
344                         }
345                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
346                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
347
348                         for( grp = 0; grp < ngtoks; grp++ ) {
349                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
350                                         for( i = 0; i < ntoks; i++ ) {
351                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
352                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
353                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
354                                                 }
355                                         }
356                                 }
357                         }
358                 }
359         } else {
360                 if( DEBUG || (vlevel > 2) ) {
361                         rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
362                 }
363         }
364 }
365
366 /*
367         Trash_entry takes a partially parsed record from the input and
368         will delete the entry if the sender,mtype matches us or it's a
369         generic mtype. The refernce in the new table is removed and the
370         refcounter for the actual rte is decreased. If that ref count is
371         0 then the memory is freed (handled byh the del_rte call).
372 */
373 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
374         rtable_ent_t*   rte;            // route table entry to be 'deleted'
375         char*   tok;
376         int             ntoks;
377         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
378         char*   tokens[128];
379
380         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
381                 return;
382         }
383
384         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
385
386         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
387                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
388                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
389
390                 key = build_rt_key( subid, atoi( ts_field ) );
391                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
392                 if( rte != NULL ) {
393                         if( DEBUG || (vlevel > 1) ) {
394                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
395                         }
396                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
397                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
398                 } else {
399                         if( DEBUG || (vlevel > 1) ) {
400                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
401                         }
402                 }
403         } else {
404                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
405         }
406 }
407
408 /*
409         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
410         the 'owner' which should be the dns name or IP address of an enpoint
411         the meid_list is a space separated list of me IDs
412
413         This function assumes the caller has vetted the pointers as needed.
414
415         For each meid in the list, an entry is pushed into the hash which references the owner
416         endpoint such that when the meid is used to route a message it references the endpoint
417         to send messages to.
418 */
419 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
420         char*   tok;
421         int             ntoks;
422         char*   tokens[128];
423         int             i;
424         int             state;
425         endpoint_t*     ep;                                             // endpoint struct for the owner
426
427         owner = clip( owner );                          // ditch extra whitespace and trailing comments
428         meid_list = clip( meid_list );
429
430         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
431         for( i = 0; i < ntoks; i++ ) {
432                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
433                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
434                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
435                 } else {
436                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
437                 }
438         }
439 }
440
441 /*
442         Given the tokens from an mme_del, delete the listed meid entries from the new
443         table. The list is a space separated list of meids.
444
445         The meids in the hash reference endpoints which are never deleted and so
446         the only thing that we need to do here is to remove the meid from the hash.
447
448         This function assumes the caller has vetted the pointers as needed.
449 */
450 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
451         char*   tok;
452         int             ntoks;
453         char*   tokens[128];
454         int             i;
455
456         if( rtab->hash == NULL ) {
457                 return;
458         }
459
460         meid_list = clip( meid_list );
461
462         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
463         for( i = 0; i < ntoks; i++ ) {
464                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
465                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
466         }
467 }
468
469 /*
470         Parse a partially parsed meid record. Tokens[0] should be one of:
471                 meid_map, mme_ar, mme_del.
472 */
473 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
474         if( tokens == NULL || ntoks < 1 ) {
475                 return;                                                 // silent but should never happen
476         }
477
478         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
479                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
480                 return;
481         }
482
483         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
484                 tokens[1] = clip( tokens[1] );
485                 if( *(tokens[1]) == 's' ) {
486                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
487                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
488                                 uta_rt_drop( ctx->new_rtable );
489                         }
490
491                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
492                         ctx->new_rtable->mupdates = 0;
493                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
494                 } else {
495                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
496                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
497                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
498                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
499                                                 uta_rt_drop( ctx->new_rtable );
500                                                 ctx->new_rtable = NULL;
501                                                 return;
502                                         }
503
504                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
505                                 }
506
507                                 if( ctx->new_rtable ) {
508                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
509                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
510                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
511                                         ctx->new_rtable = NULL;
512                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
513
514                                         if( vlevel > 0 ) {
515                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
516                                                 rt_stats( ctx->old_rtable );
517                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
518                                                 rt_stats( ctx->rtable );
519                                         }
520                                 } else {
521                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
522                                         ctx->new_rtable = NULL;
523                                 }
524                         }
525                 }
526
527                 return;
528         }       
529
530         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
531                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
532                 return;
533         }
534
535         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
536                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
537                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
538                         return;
539                 }
540                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
541                 ctx->new_rtable->mupdates++;
542         }
543
544         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
545                 if( ntoks < 2 ) {
546                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
547                         return;
548                 }
549                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
550                 ctx->new_rtable->mupdates++;
551         }
552 }
553
554 /*
555         Parse a single record recevied from the route table generator, or read
556         from a static route table file.  Start records cause a new table to
557         be started (if a partial table was received it is discarded. Table
558         entry records are added to the currenly 'in progress' table, and an
559         end record causes the in progress table to be finalised and the
560         currently active table is replaced.
561
562         The updated table will be activated when the *|end record is encountered.
563         However, to allow for a "double" update, where both the meid map and the
564         route table must be updated at the same time, the end indication on a
565         route table (new or update) may specifiy "hold" which indicates that meid
566         map entries are to follow and the updated route table should be held as
567         pending until the end of the meid map is received and validated.
568
569         CAUTION:  we are assuming that there is a single route/meid map generator
570                 and as such only one type of update is received at a time; in other
571                 words, the sender cannot mix update records and if there is more than
572                 one sender process they must synchronise to avoid issues.
573
574
575         For a RT update, we expect:
576                 newrt|{start|end [hold]}
577                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
578                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
579                 mse| <mtype>[,sender] | <sub-id> | %meid
580
581
582         For a meid map update we expect:
583                 meid_map | start
584                 meid_map | end | <count> | <md5-hash>
585                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
586                 mme_del | <meid0> <meid1>...<meidn>
587
588 */
589 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
590         int i;
591         int ntoks;                                                      // number of tokens found in something
592         int ngtoks;
593         int     grp;                                                    // group number
594         rtable_ent_t*   rte;                            // route table entry added
595         char*   tokens[128];
596         char*   tok;                                            // pointer into a token or string
597
598         if( ! buf ) {
599                 return;
600         }
601
602         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
603                 buf++;
604         }
605         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
606         *(tok+1) = 0;
607
608         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
609                 tokens[0] = clip( tokens[0] );
610                 switch( *(tokens[0]) ) {
611                         case 0:                                                                                                 // ignore blanks
612                                 // fallthrough
613                         case '#':                                                                                               // and comment lines
614                                 break;
615
616                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
617                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
618                                         break;
619                                 }
620
621                                 if( ntoks < 3 ) {
622                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
623                                         break;
624                                 }
625
626                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
627                                 ctx->new_rtable->updates++;
628                                 break;
629
630                         case 'n':                                                                                               // newrt|{start|end}
631                                 tokens[1] = clip( tokens[1] );
632                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
633                                         if( ctx->new_rtable ) {
634                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
635                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
636                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
637                                                 ctx->new_rtable = NULL;
638                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
639
640                                                 if( vlevel > 0 ) {
641                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
642                                                         rt_stats( ctx->old_rtable );
643                                                         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
644                                                         rt_stats( ctx->rtable );
645                                                 }
646                                         } else {
647                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
648                                                 ctx->new_rtable = NULL;
649                                         }
650                                 } else {                                                                                        // start a new table.
651                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
652                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
653                                                 uta_rt_drop( ctx->new_rtable );
654                                         }
655
656                                         ctx->new_rtable = NULL;
657                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
658                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
659                                 }
660                                 break;
661
662                         case 'm':                                                               // mse entry or one of the meid_ records
663                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
664                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
665                                                 break;
666                                         }
667
668                                         if( ntoks < 4 ) {
669                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
670                                                 break;
671                                         }
672
673                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
674                                         ctx->new_rtable->updates++;
675                                 } else {
676                                         meid_parser( ctx, tokens, ntoks, vlevel );
677                                 }
678                                 break;
679
680                         case 'r':                                       // assume rt entry
681                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
682                                         break;
683                                 }
684
685                                 ctx->new_rtable->updates++;
686                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
687                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
688                                 } else {
689                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
690                                 }
691                                 break;
692
693                         case 'u':                                                                                               // update current table, not a total replacement
694                                 tokens[1] = clip( tokens[1] );
695                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
696                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
697                                                 break;
698                                         }
699
700                                         if( ntoks >2 ) {
701                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
702                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
703                                                                 ctx->new_rtable->updates, tokens[2] );
704                                                         uta_rt_drop( ctx->new_rtable );
705                                                         ctx->new_rtable = NULL;
706                                                         break;
707                                                 }
708                                         }
709
710                                         if( ctx->new_rtable ) {
711                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
712                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
713                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
714                                                 ctx->new_rtable = NULL;
715                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
716
717                                                 if( vlevel > 0 ) {
718                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
719                                                         rt_stats( ctx->old_rtable );
720                                                         rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
721                                                         rt_stats( ctx->rtable );
722                                                 }
723                                         } else {
724                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
725                                                 ctx->new_rtable = NULL;
726                                         }
727                                 } else {                                                                                        // start a new table.
728                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
729                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
730                                                 uta_rt_drop( ctx->new_rtable );
731                                         }
732
733                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
734                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
735                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
736                                 }
737                                 break;
738
739                         default:
740                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
741                                 break;
742                 }
743         }
744 }
745
746 /*
747         This function attempts to open a static route table in order to create a 'seed'
748         table during initialisation.  The environment variable RMR_SEED_RT is expected
749         to contain the necessary path to the file. If missing, or if the file is empty,
750         no route table will be available until one is received from the generator.
751
752         This function is probably most useful for testing situations, or extreme
753         cases where the routes are static.
754 */
755 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
756         int             i;
757         char*   fname;
758         char*   fbuf;                           // buffer with file contents
759         char*   rec;                            // start of the record
760         char*   eor;                            // end of the record
761         int             rcount = 0;                     // record count for debug
762
763         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
764                 return;
765         }
766
767         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
768                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
769                 return;
770         }
771
772         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
773         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
774                 if( *eor == '\r' ) {
775                         *eor = '\n';                                                            // will look like a blank line which is ok
776                 }
777         }
778
779         for( rec = fbuf; rec && *rec; rec = eor+1 ) {
780                 rcount++;
781                 if( (eor = strchr( rec, '\n' )) != NULL ) {
782                         *eor = 0;
783                 } else {
784                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
785                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
786                         free( fbuf );
787                         return;
788                 }
789
790                 parse_rt_rec( ctx, rec, vlevel );
791         }
792
793         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
794         free( fbuf );
795 }
796
797 /*
798         Callback driven for each named thing in a symtab. We collect the pointers to those
799         things for later use (cloning).
800 */
801 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
802         thing_list_t*   tl;
803
804         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
805                 return;
806         }
807
808         if( thing == NULL ) {
809                 return;
810         }
811
812         tl->names[tl->nused] = name;                    // the name/key
813         tl->things[tl->nused++] = thing;                // save a reference to the thing
814 }
815
816 /*
817         Called to delete a route table entry struct. We delete the array of endpoint
818         pointers, but NOT the endpoints referenced as those are referenced from
819         multiple entries.
820
821         Route table entries can be concurrently referenced by multiple symtabs, so
822         the actual delete happens only if decrementing the rte's ref count takes it
823         to 0. Thus, it is safe to call this function across a symtab when cleaning up
824         the symtab, or overlaying an entry.
825
826         This function uses ONLY the pointer to the rte (thing) and ignores the other
827         information that symtab foreach function passes (st, entry, and data) which
828         means that it _can_ safetly be used outside of the foreach setting. If
829         the function is changed to depend on any of these three, then a stand-alone
830         rte_cleanup() function should be added and referenced by this, and refererences
831         to this outside of the foreach world should be changed.
832 */
833 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
834         rtable_ent_t*   rte;
835         int i;
836
837         if( (rte = (rtable_ent_t *) thing) == NULL ) {
838                 return;
839         }
840
841         rte->refs--;
842         if( rte->refs > 0 ) {                   // something still referencing, so it lives
843                 return;
844         }
845
846         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
847                 for( i = 0; i < rte->nrrgroups; i++ ) {
848                         if( rte->rrgroups[i] ) {
849                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
850                         }
851                 }
852
853                 free( rte->rrgroups );
854         }
855
856         free( rte );                                                                                    // finally, drop the potato
857 }
858
859 /*
860         Read an entire file into a buffer. We assume for route table files
861         they will be smallish and so this won't be a problem.
862         Returns a pointer to the buffer, or nil. Caller must free.
863         Terminates the buffer with a nil character for string processing.
864
865         If we cannot stat the file, we assume it's empty or missing and return
866         an empty buffer, as opposed to a nil, so the caller can generate defaults
867         or error if an empty/missing file isn't tolerated.
868 */
869 static char* uta_fib( char* fname ) {
870         struct stat     stats;
871         off_t           fsize = 8192;   // size of the file
872         off_t           nread;                  // number of bytes read
873         int                     fd;
874         char*           buf;                    // input buffer
875
876         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
877                 if( fstat( fd, &stats ) >= 0 ) {
878                         if( stats.st_size <= 0 ) {                                      // empty file
879                                 close( fd );
880                                 fd = -1;
881                         } else {
882                                 fsize = stats.st_size;                                          // stat ok, save the file size
883                         }
884                 } else {
885                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
886                 }
887         }
888
889         if( fd < 0 ) {                                                                                  // didn't open or empty
890                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
891                         return NULL;
892                 }
893
894                 *buf = 0;
895                 return buf;
896         }
897
898         // add a size limit check here
899
900         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
901                 close( fd );
902                 errno = ENOMEM;
903                 return NULL;
904         }
905
906         nread = read( fd, buf, fsize );
907         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
908                 free( buf );
909                 errno = EFBIG;                                                                                  // likely too much to handle
910                 close( fd );
911                 return NULL;
912         }
913
914         buf[nread] = 0;
915
916         close( fd );
917         return buf;
918 }
919
920 /*
921         Create and initialise a route table; Returns a pointer to the table struct.
922 */
923 static route_table_t* uta_rt_init( ) {
924         route_table_t*  rt;
925
926         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
927                 return NULL;
928         }
929
930         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
931                 free( rt );
932                 return NULL;
933         }
934
935         return rt;
936 }
937
938 /*
939         Clones one of the spaces in the given table.
940         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
941         Space is the space in the old table to copy. Space 0 uses an integer key and
942         references rte structs. All other spaces use a string key and reference endpoints.
943 */
944 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
945         endpoint_t*             ep;             // an endpoint
946         rtable_ent_t*   rte;    // a route table entry
947         void*   sst;                    // source symtab
948         void*   nst;                    // new symtab
949         thing_list_t things;    // things from the space to copy
950         int             i;
951         int             free_on_err = 0;
952
953         if( nrt == NULL ) {                             // make a new table if needed
954                 free_on_err = 1;
955                 nrt = uta_rt_init();
956         }
957
958         if( srt == NULL ) {             // source was nil, just give back the new table
959                 return nrt;
960         }
961
962         things.nalloc = 2048;
963         things.nused = 0;
964         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
965         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
966         if( things.things == NULL ) {
967                 if( free_on_err ) {
968                         free( nrt->hash );
969                         free( nrt );
970                         nrt = NULL;
971                 }
972
973                 return nrt;
974         }
975
976         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
977         nst = nrt->hash;
978
979         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
980
981         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
982         for( i = 0; i < things.nused; i++ ) {
983                 if( space ) {                                                                                           // string key, epoint reference
984                         ep = (endpoint_t *) things.things[i];
985                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
986                 } else {
987                         rte = (rtable_ent_t *) things.things[i];
988                         rte->refs++;                                                                                    // rtes can be removed, so we track references
989                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
990                 }
991         }
992
993         free( things.things );
994         free( (void *) things.names );
995         return nrt;
996 }
997
998 /*
999         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1000         The endpoint and meid entries in the hash must be preserved.
1001 */
1002 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1003         endpoint_t*             ep;                             // an endpoint
1004         rtable_ent_t*   rte;                    // a route table entry
1005         route_table_t*  nrt = NULL;             // new route table
1006         int i;
1007
1008         if( srt == NULL ) {
1009                 return uta_rt_init();           // no source to clone, just return an empty table
1010         }
1011
1012         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1013         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1014
1015         return nrt;
1016 }
1017
1018 /*
1019         Creates a new route table and then clones  _all_ of the given route table (references 
1020         both endpoints AND the route table entries. Needed to support a partial update where 
1021         some route table entries will not be deleted if not explicitly in the update and when 
1022         we are adding/replacing meid references.
1023 */
1024 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1025         endpoint_t*             ep;                             // an endpoint
1026         rtable_ent_t*   rte;                    // a route table entry
1027         route_table_t*  nrt = NULL;             // new route table
1028         int i;
1029
1030         if( srt == NULL ) {
1031                 return uta_rt_init();           // no source to clone, just return an empty table
1032         }
1033
1034         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1035         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1036         rt_clone_space( srt, nrt, RT_ME_SPACE );
1037
1038         return nrt;
1039 }
1040
1041 /*
1042         Given a name, find the endpoint struct in the provided route table.
1043 */
1044 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1045
1046         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1047                 return NULL;
1048         }
1049
1050         return rmr_sym_get( rt->hash, ep_name, 1 );
1051 }
1052
1053 /*
1054         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1055 */
1056 static void uta_rt_drop( route_table_t* rt ) {
1057         if( rt == NULL ) {
1058                 return;
1059         }
1060
1061         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1062         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1063         free( rt );
1064 }
1065
1066 /*
1067         Look up and return the pointer to the endpoint stuct matching the given name.
1068         If not in the hash, a new endpoint is created, added to the hash. Should always
1069         return a pointer.
1070 */
1071 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1072         endpoint_t*     ep;
1073
1074         if( !rt || !ep_name || ! *ep_name ) {
1075                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1076                 errno = EINVAL;
1077                 return NULL;
1078         }
1079
1080         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1081                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1082                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1083                         errno = ENOMEM;
1084                         return NULL;
1085                 }
1086
1087                 ep->notify = 1;                                                         // show notification on first connection failure
1088                 ep->open = 0;                                                           // not connected
1089                 ep->addr = uta_h2ip( ep_name );
1090                 ep->name = strdup( ep_name );
1091                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1092                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1093
1094                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1095         }
1096
1097         return ep;
1098 }
1099
1100
1101 /*
1102         Given a session id and message type build a key that can be used to look up the rte in the route
1103         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1104 */
1105 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1106         uint64_t key;
1107
1108         if( sub_id == UNSET_SUBID ) {
1109                 key = 0xffffffff00000000 | mtype;
1110         } else {
1111                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1112         }
1113
1114         return key;
1115 }
1116
1117
1118 #endif