Allow RTS calls prior to initial route table load
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49 #include <RIC_message_types.h>          // needed for route manager messages
50
51
52 /*
53         Passed to a symtab foreach callback to construct a list of pointers from
54         a current symtab.
55 */
56 typedef struct thing_list {
57         int nalloc;
58         int nused;
59         void** things;
60         const char** names;
61 } thing_list_t;
62
63 // ---- debugging/testing -------------------------------------------------------------------------
64
65 /*
66         Dump some stats for an endpoint in the RT. This is generally called to
67         verify endpoints after a table load/change.
68 */
69 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
70         int*    counter;
71         endpoint_t* ep;
72
73         if( (ep = (endpoint_t *) thing) == NULL ) {
74                 return;
75         }
76
77         if( (counter = (int *) vcounter) != NULL ) {
78                 (*counter)++;
79         }
80
81         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
82 }
83
84 /*
85         Called to count meid entries in the table. The meid points to an 'owning' endpoint
86         so we can list what we find
87 */
88 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
89         int*    counter;
90         endpoint_t* ep;
91
92         if( (ep = (endpoint_t *) thing) == NULL ) {
93                 return;
94         }
95
96         if( (counter = (int *) vcounter) != NULL ) {
97                 (*counter)++;
98         }
99
100         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
101 }
102
103 /*
104         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
105         the 'source' information and is added to each message.
106 */
107 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
108         endpoint_t* ep;
109         char*   id;
110
111         if( (ep = (endpoint_t *) thing) == NULL ) {
112                 return;
113         }
114
115         if( (id = (char *) vid) == NULL ) {
116                 id = "missing";
117         }
118
119         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
120                 (long long) time( NULL ),
121                 id,
122                 ep->name,
123                 ep->open,
124                 ep->scounts[EPSC_GOOD],
125                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
126                 ep->scounts[EPSC_FAIL],
127                 ep->scounts[EPSC_TRANS]   );
128 }
129
130 /*
131         Dump stats for a route entry in the table.
132 */
133 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
134         int*    counter;
135         rtable_ent_t* rte;                      // thing is really an rte
136         int             mtype;
137         int             sid;
138
139         if( (rte = (rtable_ent_t *) thing) == NULL ) {
140                 return;
141         }
142
143         if( (counter = (int *) vcounter) != NULL ) {
144                 (*counter)++;
145         }
146
147         mtype = rte->key & 0xffff;
148         sid = (int) (rte->key >> 32);
149
150         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
151 }
152
153 /*
154         Given a route table, cause some stats to be spit out.
155 */
156 static void  rt_stats( route_table_t* rt ) {
157         int* counter;
158
159         if( rt == NULL ) {
160                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
161                 return;
162         }
163
164         counter = (int *) malloc( sizeof( int ) );
165         *counter = 0;
166         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
167         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
168         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
169         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
170
171         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
172         *counter = 0;
173         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
174         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
175
176         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
177         *counter = 0;
178         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
179         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
180
181         free( counter );
182 }
183
184 /*
185         Given a route table, cause endpoint counters to be written to stderr. The id
186         parm is written as the "source" in the output.
187 */
188 static void  rt_epcounts( route_table_t* rt, char* id ) {
189         if( rt == NULL ) {
190                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
191                 return;
192         }
193
194         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
195 }
196
197 // ------------ route manager communication -------------------------------------------------
198 /*
199         Send a request for a table update to the route manager. Updates come in
200         async, so send and go.
201
202         pctx is the private context for the thread; ctx is the application context
203         that we need to be able to send the application ID in case rt mgr needs to
204         use it to idenfity us.
205
206         Returns 0 if we were not able to send a request.
207 */
208 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
209         rmr_mbuf_t*     smsg;
210         int     state = 0;
211
212         if( ctx->rtg_whid < 0 ) {
213                 return state;
214         }
215
216         smsg = rmr_alloc_msg( pctx, 1024 );
217         if( smsg != NULL ) {
218                 smsg->mtype = RMRRM_REQ_TABLE;
219                 smsg->sub_id = 0;
220                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
221                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
222                 smsg->len = strlen( smsg->payload ) + 1;
223         
224                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
225                 if( (state = smsg->state) != RMR_OK ) {
226                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
227                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
228                         ctx->rtg_whid = -1;
229                 }
230
231                 rmr_free_msg( smsg );
232         }
233
234         return state;
235 }
236
237 /*
238         Send an ack to the route table manager for a table ID that we are
239         processing.      State is 1 for OK, and 0 for failed. Reason might 
240         be populated if we know why there was a failure.
241
242         Context should be the PRIVATE context that we use for messages
243         to route manger and NOT the user's context.
244
245         If a message buffere is passed we use that and use return to sender
246         assuming that this might be a response to a call and that is needed
247         to send back to the proper calling thread. If msg is nil, we allocate
248         and use it.
249 */
250 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
251         int             use_rts = 1;
252         int             payload_size = 1024;
253         
254         if( ctx == NULL || ctx->rtg_whid < 0 ) {
255                 return;
256         }
257
258         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
259                 return;
260         }
261
262         if( smsg != NULL ) {
263                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
264         } else {
265                 use_rts = 0;
266                 smsg = rmr_alloc_msg( ctx, payload_size );
267         }
268
269         if( smsg != NULL ) {
270                 smsg->mtype = RMRRM_TABLE_STATE;
271                 smsg->sub_id = -1;
272                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
273                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
274
275                 smsg->len = strlen( smsg->payload ) + 1;
276         
277                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
278                 if( use_rts ) {
279                         smsg = rmr_rts_msg( ctx, smsg );
280                 } else {
281                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
282                 }
283                 if( (state = smsg->state) != RMR_OK ) {
284                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
285                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
286                         ctx->rtg_whid = -1;
287                 }
288
289                 if( ! use_rts ) {
290                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
291                 }
292         }
293 }
294
295 // ------------------------------------------------------------------------------------------------
296 /*
297         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
298         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
299 */
300 static char* clip( char* buf ) {
301         char*   tok;
302
303         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
304                 buf++;
305         }
306
307         if( (tok = strchr( buf, '#' )) != NULL ) {
308                 if( tok == buf ) {
309                         return buf;                                     // just push back; leading comment sym handled there
310                 }
311
312                 if( isspace( *(tok-1) ) ) {
313                         *tok = 0;
314                 }
315         }
316
317         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
318         *(tok+1) = 0;
319
320         return buf;
321 }
322
323 /*
324         This accepts a pointer to a nil terminated string, and ensures that there is a
325         newline as the last character. If there is not, a new buffer is allocated and
326         the newline is added.  If a new buffer is allocated, the buffer passed in is
327         freed.  The function returns a pointer which the caller should use, and must
328         free.  In the event of an error, a nil pointer is returned.
329 */
330 static char* ensure_nlterm( char* buf ) {
331         char*   nb = NULL;
332         int             len = 1;
333
334         nb = buf;
335         if( buf == NULL || (len = strlen( buf )) < 2 ) {
336                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
337                         *nb = '\n';
338                         *(nb+1) = 0;
339                 }
340         } else {
341                 if( buf[len-1] != '\n' ) {
342                         rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
343                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
344                                 memcpy( nb, buf, len );
345                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
346                                 *(nb+len+1) = 0;
347                         }
348
349                         free( buf );
350                 }
351         }
352
353         return nb;
354 }
355
356 /*
357         Given a message type create a route table entry and add to the hash keyed on the
358         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
359         is the number of group slots to allocate in the entry.
360 */
361 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
362         rtable_ent_t* rte;
363         rtable_ent_t* old_rte;          // entry which was already in the table for the key
364
365         if( rt == NULL ) {
366                 return NULL;
367         }
368
369         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
370                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
371                 return NULL;
372         }
373         memset( rte, 0, sizeof( *rte ) );
374         rte->refs = 1;
375         rte->key = key;
376
377         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
378                 nrrgroups = 10;
379         }
380
381         if( nrrgroups ) {
382                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
383                         free( rte );
384                         return NULL;
385                 }
386                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
387         } else {
388                 rte->rrgroups = NULL;
389         }
390
391         rte->nrrgroups = nrrgroups;
392
393         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
394                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
395         }
396
397         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
398
399         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
400         return rte;
401 }
402
403 /*
404         This accepts partially parsed information from an rte or mse record sent by route manager or read from
405         a file such that:
406                 ts_field is the msg-type,sender field
407                 subid is the integer subscription id
408                 rr_field is the endpoint information for round robening message over
409
410         If all goes well, this will add an RTE to the table under construction.
411
412         The ts_field is checked to see if we should ingest this record. We ingest if one of
413         these is true:
414                 there is no sender info (a generic entry for all)
415                 there is sender and our host:port matches one of the senders
416                 the sender info is an IP address that matches one of our IP addresses
417 */
418 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
419         rtable_ent_t*   rte;            // route table entry added
420         char*   tok;
421         int             ntoks;
422         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
423         char*   tokens[128];
424         char*   gtokens[64];
425         int             i;
426         int             ngtoks;                         // number of tokens in the group list
427         int             grp;                            // index into group list
428
429         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
430         rr_field = clip( rr_field );
431
432         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
433                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
434                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
435
436                 key = build_rt_key( subid, atoi( ts_field ) );
437
438                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
439
440                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
441                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
442                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
443                         }
444                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
445                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
446
447                         for( grp = 0; grp < ngtoks; grp++ ) {
448                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
449                                         for( i = 0; i < ntoks; i++ ) {
450                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
451                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
452                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
453                                                 }
454                                         }
455                                 }
456                         }
457                 }
458         } else {
459                 if( DEBUG || (vlevel > 2) ) {
460                         rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
461                 }
462         }
463 }
464
465 /*
466         Trash_entry takes a partially parsed record from the input and
467         will delete the entry if the sender,mtype matches us or it's a
468         generic mtype. The refernce in the new table is removed and the
469         refcounter for the actual rte is decreased. If that ref count is
470         0 then the memory is freed (handled byh the del_rte call).
471 */
472 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
473         rtable_ent_t*   rte;            // route table entry to be 'deleted'
474         char*   tok;
475         int             ntoks;
476         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
477         char*   tokens[128];
478
479         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
480                 return;
481         }
482
483         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
484
485         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
486                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
487                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
488
489                 key = build_rt_key( subid, atoi( ts_field ) );
490                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
491                 if( rte != NULL ) {
492                         if( DEBUG || (vlevel > 1) ) {
493                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
494                         }
495                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
496                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
497                 } else {
498                         if( DEBUG || (vlevel > 1) ) {
499                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
500                         }
501                 }
502         } else {
503                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
504         }
505 }
506
507 /*
508         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
509         the 'owner' which should be the dns name or IP address of an enpoint
510         the meid_list is a space separated list of me IDs
511
512         This function assumes the caller has vetted the pointers as needed.
513
514         For each meid in the list, an entry is pushed into the hash which references the owner
515         endpoint such that when the meid is used to route a message it references the endpoint
516         to send messages to.
517 */
518 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
519         char*   tok;
520         int             ntoks;
521         char*   tokens[128];
522         int             i;
523         int             state;
524         endpoint_t*     ep;                                             // endpoint struct for the owner
525
526         owner = clip( owner );                          // ditch extra whitespace and trailing comments
527         meid_list = clip( meid_list );
528
529         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
530         for( i = 0; i < ntoks; i++ ) {
531                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
532                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
533                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
534                 } else {
535                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
536                 }
537         }
538 }
539
540 /*
541         Given the tokens from an mme_del, delete the listed meid entries from the new
542         table. The list is a space separated list of meids.
543
544         The meids in the hash reference endpoints which are never deleted and so
545         the only thing that we need to do here is to remove the meid from the hash.
546
547         This function assumes the caller has vetted the pointers as needed.
548 */
549 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
550         char*   tok;
551         int             ntoks;
552         char*   tokens[128];
553         int             i;
554
555         if( rtab->hash == NULL ) {
556                 return;
557         }
558
559         meid_list = clip( meid_list );
560
561         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
562         for( i = 0; i < ntoks; i++ ) {
563                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
564                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
565         }
566 }
567
568 /*
569         Parse a partially parsed meid record. Tokens[0] should be one of:
570                 meid_map, mme_ar, mme_del.
571
572         pctx is the private context needed to return an ack/nack using the provided
573         message buffer with the route managers address info.
574 */
575 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
576         char wbuf[1024];
577
578         if( tokens == NULL || ntoks < 1 ) {
579                 return;                                                 // silent but should never happen
580         }
581
582         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
583                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
584                 return;
585         }
586
587         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
588                 tokens[1] = clip( tokens[1] );
589                 if( *(tokens[1]) == 's' ) {
590                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
591                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
592                                 uta_rt_drop( ctx->new_rtable );
593                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as end never made it
594                         }
595
596                         if( ctx->table_id != NULL ) {
597                                 free( ctx->table_id );
598                         }
599                         if( ntoks >2 ) {
600                                 ctx->table_id = strdup( clip( tokens[2] ) );
601                         } else {
602                                 ctx->table_id = NULL;
603                         }
604                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
605                         ctx->new_rtable->mupdates = 0;
606                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
607                 } else {
608                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
609                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
610                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
611                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
612                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
613                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
614                                                 uta_rt_drop( ctx->new_rtable );
615                                                 ctx->new_rtable = NULL;
616                                                 return;
617                                         }
618
619                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
620                                 }
621
622                                 if( ctx->new_rtable ) {
623                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
624                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
625                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
626                                         ctx->new_rtable = NULL;
627                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
628                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
629
630                                         if( vlevel > 0 ) {
631                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
632                                                 rt_stats( ctx->old_rtable );
633                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
634                                                 rt_stats( ctx->rtable );
635                                         }
636                                 } else {
637                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
638                                         ctx->new_rtable = NULL;
639                                 }
640                         }
641                 }
642
643                 return;
644         }       
645
646         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
647                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
648                 return;
649         }
650
651         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
652                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
653                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
654                         return;
655                 }
656                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
657                 ctx->new_rtable->mupdates++;
658         }
659
660         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
661                 if( ntoks < 2 ) {
662                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
663                         return;
664                 }
665                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
666                 ctx->new_rtable->mupdates++;
667         }
668 }
669
670 /*
671         Parse a single record recevied from the route table generator, or read
672         from a static route table file.  Start records cause a new table to
673         be started (if a partial table was received it is discarded. Table
674         entry records are added to the currenly 'in progress' table, and an
675         end record causes the in progress table to be finalised and the
676         currently active table is replaced.
677
678         The updated table will be activated when the *|end record is encountered.
679         However, to allow for a "double" update, where both the meid map and the
680         route table must be updated at the same time, the end indication on a
681         route table (new or update) may specifiy "hold" which indicates that meid
682         map entries are to follow and the updated route table should be held as
683         pending until the end of the meid map is received and validated.
684
685         CAUTION:  we are assuming that there is a single route/meid map generator
686                 and as such only one type of update is received at a time; in other
687                 words, the sender cannot mix update records and if there is more than
688                 one sender process they must synchronise to avoid issues.
689
690
691         For a RT update, we expect:
692                 newrt | start | <table-id>
693                 newrt | end | <count>
694                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
695                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
696                 mse| <mtype>[,sender] | <sub-id> | %meid
697
698
699         For a meid map update we expect:
700                 meid_map | start | <table-id>
701                 meid_map | end | <count> | <md5-hash>
702                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
703                 mme_del | <meid0> <meid1>...<meidn>
704
705
706         The pctx is our private context that must be used to send acks/status
707         messages back to the route manager.  The regular ctx is the ctx that
708         the user has been given and thus that's where we have to hang the route
709         table we're working with.
710
711         If mbuf is given, and we need to ack, then we ack using the mbuf and a
712         return to sender call (allows route manager to use wh_call() to send
713         an update and rts is required to get that back to the right thread).
714         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
715         will be used.
716 */
717 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
718         int i;
719         int ntoks;                                                      // number of tokens found in something
720         int ngtoks;
721         int     grp;                                                    // group number
722         rtable_ent_t*   rte;                            // route table entry added
723         char*   tokens[128];
724         char*   tok;                                            // pointer into a token or string
725         char    wbuf[1024];
726
727         if( ! buf ) {
728                 return;
729         }
730
731         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
732                 buf++;
733         }
734         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
735         *(tok+1) = 0;
736
737         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
738                 tokens[0] = clip( tokens[0] );
739                 switch( *(tokens[0]) ) {
740                         case 0:                                                                                                 // ignore blanks
741                                 // fallthrough
742                         case '#':                                                                                               // and comment lines
743                                 break;
744
745                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
746                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
747                                         break;
748                                 }
749
750                                 if( ntoks < 3 ) {
751                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
752                                         break;
753                                 }
754
755                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
756                                 ctx->new_rtable->updates++;
757                                 break;
758
759                         case 'n':                                                                                               // newrt|{start|end}
760                                 tokens[1] = clip( tokens[1] );
761                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
762                                         if( ntoks >2 ) {
763                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
764                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
765                                                                 ctx->new_rtable->updates, tokens[2] );
766                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
767                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
768                                                         uta_rt_drop( ctx->new_rtable );
769                                                         ctx->new_rtable = NULL;
770                                                         break;
771                                                 }
772                                         }
773
774                                         if( ctx->new_rtable ) {
775                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
776                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
777                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
778                                                 ctx->new_rtable = NULL;
779                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
780
781                                                 if( vlevel > 0 ) {
782                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
783                                                         rt_stats( ctx->old_rtable );
784                                                         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
785                                                         rt_stats( ctx->rtable );
786                                                 }
787
788                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
789                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
790                                         } else {
791                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
792                                                 ctx->new_rtable = NULL;
793                                         }
794                                 } else {                                                                                                                        // start a new table.
795                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
796                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
797
798                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
799                                                 uta_rt_drop( ctx->new_rtable );
800                                         }
801
802                                         if( ctx->table_id != NULL ) {
803                                                 free( ctx->table_id );
804                                         }
805                                         if( ntoks >2 ) {
806                                                 ctx->table_id = strdup( clip( tokens[2] ) );
807                                         } else {
808                                                 ctx->table_id = NULL;
809                                         }
810
811                                         ctx->new_rtable = NULL;
812                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
813                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
814                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
815                                 }
816                                 break;
817
818                         case 'm':                                                                       // mse entry or one of the meid_ records
819                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
820                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
821                                                 break;
822                                         }
823
824                                         if( ntoks < 4 ) {
825                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
826                                                 break;
827                                         }
828
829                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
830                                         ctx->new_rtable->updates++;
831                                 } else {
832                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
833                                 }
834                                 break;
835
836                         case 'r':                                       // assume rt entry
837                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
838                                         break;
839                                 }
840
841                                 ctx->new_rtable->updates++;
842                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
843                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
844                                 } else {
845                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
846                                 }
847                                 break;
848
849                         case 'u':                                                                                               // update current table, not a total replacement
850                                 tokens[1] = clip( tokens[1] );
851                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
852                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
853                                                 break;
854                                         }
855
856                                         if( ntoks >2 ) {
857                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
858                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
859                                                                 ctx->new_rtable->updates, tokens[2] );
860                                                         uta_rt_drop( ctx->new_rtable );
861                                                         ctx->new_rtable = NULL;
862                                                         break;
863                                                 }
864                                         }
865
866                                         if( ctx->new_rtable ) {
867                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
868                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
869                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
870                                                 ctx->new_rtable = NULL;
871                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
872
873                                                 if( vlevel > 0 ) {
874                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
875                                                         rt_stats( ctx->old_rtable );
876                                                         rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
877                                                         rt_stats( ctx->rtable );
878                                                 }
879                                         } else {
880                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
881                                                 ctx->new_rtable = NULL;
882                                         }
883                                 } else {                                                                                        // start a new table.
884                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
885                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
886                                                 uta_rt_drop( ctx->new_rtable );
887                                         }
888
889                                         if( ntoks >2 ) {
890                                                 if( ctx->table_id != NULL ) {
891                                                         free( ctx->table_id );
892                                                 }
893                                                 ctx->table_id = strdup( clip( tokens[2] ) );
894                                         }
895
896                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
897                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
898                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
899                                 }
900                                 break;
901
902                         default:
903                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
904                                 break;
905                 }
906         }
907 }
908
909 /*
910         This function attempts to open a static route table in order to create a 'seed'
911         table during initialisation.  The environment variable RMR_SEED_RT is expected
912         to contain the necessary path to the file. If missing, or if the file is empty,
913         no route table will be available until one is received from the generator.
914
915         This function is probably most useful for testing situations, or extreme
916         cases where the routes are static.
917 */
918 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
919         int             i;
920         char*   fname;
921         char*   fbuf;                           // buffer with file contents
922         char*   rec;                            // start of the record
923         char*   eor;                            // end of the record
924         int             rcount = 0;                     // record count for debug
925
926         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
927                 return;
928         }
929
930         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
931                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
932                 return;
933         }
934
935         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
936         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
937                 if( *eor == '\r' ) {
938                         *eor = '\n';                                                            // will look like a blank line which is ok
939                 }
940         }
941
942         rec = fbuf;
943         while( rec && *rec ) {
944                 rcount++;
945                 if( (eor = strchr( rec, '\n' )) != NULL ) {
946                         *eor = 0;
947                 } else {
948                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
949                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
950                         free( fbuf );
951                         return;
952                 }
953
954                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
955
956                 rec = eor+1;
957         }
958
959         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
960         free( fbuf );
961 }
962
963 /*
964         Callback driven for each named thing in a symtab. We collect the pointers to those
965         things for later use (cloning).
966 */
967 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
968         thing_list_t*   tl;
969
970         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
971                 return;
972         }
973
974         if( thing == NULL ) {
975                 return;
976         }
977
978         tl->names[tl->nused] = name;                    // the name/key
979         tl->things[tl->nused++] = thing;                // save a reference to the thing
980 }
981
982 /*
983         Called to delete a route table entry struct. We delete the array of endpoint
984         pointers, but NOT the endpoints referenced as those are referenced from
985         multiple entries.
986
987         Route table entries can be concurrently referenced by multiple symtabs, so
988         the actual delete happens only if decrementing the rte's ref count takes it
989         to 0. Thus, it is safe to call this function across a symtab when cleaning up
990         the symtab, or overlaying an entry.
991
992         This function uses ONLY the pointer to the rte (thing) and ignores the other
993         information that symtab foreach function passes (st, entry, and data) which
994         means that it _can_ safetly be used outside of the foreach setting. If
995         the function is changed to depend on any of these three, then a stand-alone
996         rte_cleanup() function should be added and referenced by this, and refererences
997         to this outside of the foreach world should be changed.
998 */
999 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1000         rtable_ent_t*   rte;
1001         int i;
1002
1003         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1004                 return;
1005         }
1006
1007         rte->refs--;
1008         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1009                 return;
1010         }
1011
1012         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1013                 for( i = 0; i < rte->nrrgroups; i++ ) {
1014                         if( rte->rrgroups[i] ) {
1015                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1016                         }
1017                 }
1018
1019                 free( rte->rrgroups );
1020         }
1021
1022         free( rte );                                                                                    // finally, drop the potato
1023 }
1024
1025 /*
1026         Read an entire file into a buffer. We assume for route table files
1027         they will be smallish and so this won't be a problem.
1028         Returns a pointer to the buffer, or nil. Caller must free.
1029         Terminates the buffer with a nil character for string processing.
1030
1031         If we cannot stat the file, we assume it's empty or missing and return
1032         an empty buffer, as opposed to a nil, so the caller can generate defaults
1033         or error if an empty/missing file isn't tolerated.
1034 */
1035 static char* uta_fib( char* fname ) {
1036         struct stat     stats;
1037         off_t           fsize = 8192;   // size of the file
1038         off_t           nread;                  // number of bytes read
1039         int                     fd;
1040         char*           buf;                    // input buffer
1041
1042         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1043                 if( fstat( fd, &stats ) >= 0 ) {
1044                         if( stats.st_size <= 0 ) {                                      // empty file
1045                                 close( fd );
1046                                 fd = -1;
1047                         } else {
1048                                 fsize = stats.st_size;                                          // stat ok, save the file size
1049                         }
1050                 } else {
1051                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1052                 }
1053         }
1054
1055         if( fd < 0 ) {                                                                                  // didn't open or empty
1056                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1057                         return NULL;
1058                 }
1059
1060                 *buf = 0;
1061                 return buf;
1062         }
1063
1064         // add a size limit check here
1065
1066         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1067                 close( fd );
1068                 errno = ENOMEM;
1069                 return NULL;
1070         }
1071
1072         nread = read( fd, buf, fsize );
1073         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1074                 free( buf );
1075                 errno = EFBIG;                                                                                  // likely too much to handle
1076                 close( fd );
1077                 return NULL;
1078         }
1079
1080         buf[nread] = 0;
1081
1082         close( fd );
1083         return buf;
1084 }
1085
1086 /*
1087         Create and initialise a route table; Returns a pointer to the table struct.
1088 */
1089 static route_table_t* uta_rt_init( ) {
1090         route_table_t*  rt;
1091
1092         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1093                 return NULL;
1094         }
1095
1096         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1097                 free( rt );
1098                 return NULL;
1099         }
1100
1101         return rt;
1102 }
1103
1104 /*
1105         Clones one of the spaces in the given table.
1106         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1107         Space is the space in the old table to copy. Space 0 uses an integer key and
1108         references rte structs. All other spaces use a string key and reference endpoints.
1109 */
1110 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1111         endpoint_t*             ep;             // an endpoint
1112         rtable_ent_t*   rte;    // a route table entry
1113         void*   sst;                    // source symtab
1114         void*   nst;                    // new symtab
1115         thing_list_t things;    // things from the space to copy
1116         int             i;
1117         int             free_on_err = 0;
1118
1119         if( nrt == NULL ) {                             // make a new table if needed
1120                 free_on_err = 1;
1121                 nrt = uta_rt_init();
1122         }
1123
1124         if( srt == NULL ) {             // source was nil, just give back the new table
1125                 return nrt;
1126         }
1127
1128         things.nalloc = 2048;
1129         things.nused = 0;
1130         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1131         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1132         if( things.things == NULL ) {
1133                 if( free_on_err ) {
1134                         free( nrt->hash );
1135                         free( nrt );
1136                         nrt = NULL;
1137                 }
1138
1139                 return nrt;
1140         }
1141
1142         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1143         nst = nrt->hash;
1144
1145         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1146
1147         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1148         for( i = 0; i < things.nused; i++ ) {
1149                 if( space ) {                                                                                           // string key, epoint reference
1150                         ep = (endpoint_t *) things.things[i];
1151                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1152                 } else {
1153                         rte = (rtable_ent_t *) things.things[i];
1154                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1155                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1156                 }
1157         }
1158
1159         free( things.things );
1160         free( (void *) things.names );
1161         return nrt;
1162 }
1163
1164 /*
1165         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1166         The endpoint and meid entries in the hash must be preserved.
1167 */
1168 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1169         endpoint_t*             ep;                             // an endpoint
1170         rtable_ent_t*   rte;                    // a route table entry
1171         route_table_t*  nrt = NULL;             // new route table
1172         int i;
1173
1174         if( srt == NULL ) {
1175                 return uta_rt_init();           // no source to clone, just return an empty table
1176         }
1177
1178         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1179         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1180
1181         return nrt;
1182 }
1183
1184 /*
1185         Creates a new route table and then clones  _all_ of the given route table (references 
1186         both endpoints AND the route table entries. Needed to support a partial update where 
1187         some route table entries will not be deleted if not explicitly in the update and when 
1188         we are adding/replacing meid references.
1189 */
1190 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1191         endpoint_t*             ep;                             // an endpoint
1192         rtable_ent_t*   rte;                    // a route table entry
1193         route_table_t*  nrt = NULL;             // new route table
1194         int i;
1195
1196         if( srt == NULL ) {
1197                 return uta_rt_init();           // no source to clone, just return an empty table
1198         }
1199
1200         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1201         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1202         rt_clone_space( srt, nrt, RT_ME_SPACE );
1203
1204         return nrt;
1205 }
1206
1207 /*
1208         Given a name, find the endpoint struct in the provided route table.
1209 */
1210 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1211
1212         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1213                 return NULL;
1214         }
1215
1216         return rmr_sym_get( rt->hash, ep_name, 1 );
1217 }
1218
1219 /*
1220         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1221 */
1222 static void uta_rt_drop( route_table_t* rt ) {
1223         if( rt == NULL ) {
1224                 return;
1225         }
1226
1227         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1228         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1229         free( rt );
1230 }
1231
1232 /*
1233         Look up and return the pointer to the endpoint stuct matching the given name.
1234         If not in the hash, a new endpoint is created, added to the hash. Should always
1235         return a pointer.
1236 */
1237 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1238         endpoint_t*     ep;
1239
1240         if( !rt || !ep_name || ! *ep_name ) {
1241                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1242                 errno = EINVAL;
1243                 return NULL;
1244         }
1245
1246         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1247                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1248                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1249                         errno = ENOMEM;
1250                         return NULL;
1251                 }
1252
1253                 ep->notify = 1;                                                         // show notification on first connection failure
1254                 ep->open = 0;                                                           // not connected
1255                 ep->addr = uta_h2ip( ep_name );
1256                 ep->name = strdup( ep_name );
1257                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1258                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1259
1260                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1261         }
1262
1263         return ep;
1264 }
1265
1266
1267 /*
1268         Given a session id and message type build a key that can be used to look up the rte in the route
1269         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1270 */
1271 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1272         uint64_t key;
1273
1274         if( sub_id == UNSET_SUBID ) {
1275                 key = 0xffffffff00000000 | mtype;
1276         } else {
1277                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1278         }
1279
1280         return key;
1281 }
1282
1283 /*
1284         Given a route table and meid string, find the owner (if known). Returns a pointer to
1285         the endpoint struct or nil.
1286 */
1287 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
1288         endpoint_t* ep;         // the ep we found in the hash
1289
1290         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1291                 return NULL;
1292         }
1293
1294         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
1295 }
1296
1297 #endif