Add ability to control route table req frequency
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49 #include <RIC_message_types.h>          // needed for route manager messages
50
51
52 /*
53         Passed to a symtab foreach callback to construct a list of pointers from
54         a current symtab.
55 */
56 typedef struct thing_list {
57         int nalloc;
58         int nused;
59         void** things;
60         const char** names;
61 } thing_list_t;
62
63 // ---- debugging/testing -------------------------------------------------------------------------
64
65 /*
66         Dump some stats for an endpoint in the RT. This is generally called to
67         verify endpoints after a table load/change.
68 */
69 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
70         int*    counter;
71         endpoint_t* ep;
72
73         if( (ep = (endpoint_t *) thing) == NULL ) {
74                 return;
75         }
76
77         if( (counter = (int *) vcounter) != NULL ) {
78                 (*counter)++;
79         }
80
81         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
82 }
83
84 /*
85         Called to count meid entries in the table. The meid points to an 'owning' endpoint
86         so we can list what we find
87 */
88 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
89         int*    counter;
90         endpoint_t* ep;
91
92         if( (ep = (endpoint_t *) thing) == NULL ) {
93                 return;
94         }
95
96         if( (counter = (int *) vcounter) != NULL ) {
97                 (*counter)++;
98         }
99
100         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
101 }
102
103 /*
104         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
105         the 'source' information and is added to each message.
106 */
107 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
108         endpoint_t* ep;
109         char*   id;
110
111         if( (ep = (endpoint_t *) thing) == NULL ) {
112                 return;
113         }
114
115         if( (id = (char *) vid) == NULL ) {
116                 id = "missing";
117         }
118
119         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
120                 (long long) time( NULL ),
121                 id,
122                 ep->name,
123                 ep->open,
124                 ep->scounts[EPSC_GOOD],
125                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
126                 ep->scounts[EPSC_FAIL],
127                 ep->scounts[EPSC_TRANS]   );
128 }
129
130 /*
131         Dump stats for a route entry in the table.
132 */
133 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
134         int*    counter;
135         rtable_ent_t* rte;                      // thing is really an rte
136         int             mtype;
137         int             sid;
138
139         if( (rte = (rtable_ent_t *) thing) == NULL ) {
140                 return;
141         }
142
143         if( (counter = (int *) vcounter) != NULL ) {
144                 (*counter)++;
145         }
146
147         mtype = rte->key & 0xffff;
148         sid = (int) (rte->key >> 32);
149
150         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
151 }
152
153 /*
154         Given a route table, cause some stats to be spit out.
155 */
156 static void  rt_stats( route_table_t* rt ) {
157         int* counter;
158
159         if( rt == NULL ) {
160                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
161                 return;
162         }
163
164         counter = (int *) malloc( sizeof( int ) );
165         *counter = 0;
166         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
167         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
168         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
169         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
170
171         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
172         *counter = 0;
173         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
174         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
175
176         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
177         *counter = 0;
178         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
179         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
180
181         free( counter );
182 }
183
184 /*
185         Given a route table, cause endpoint counters to be written to stderr. The id
186         parm is written as the "source" in the output.
187 */
188 static void  rt_epcounts( route_table_t* rt, char* id ) {
189         if( rt == NULL ) {
190                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
191                 return;
192         }
193
194         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
195 }
196
197 // ------------ route manager communication -------------------------------------------------
198 /*
199         Send a request for a table update to the route manager. Updates come in
200         async, so send and go.
201
202         pctx is the private context for the thread; ctx is the application context
203         that we need to be able to send the application ID in case rt mgr needs to
204         use it to idenfity us.
205
206         Returns 0 if we were not able to send a request.
207 */
208 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
209         rmr_mbuf_t*     smsg;
210         int     state = 0;
211
212         if( ctx->rtg_whid < 0 ) {
213                 return state;
214         }
215
216         smsg = rmr_alloc_msg( pctx, 1024 );
217         if( smsg != NULL ) {
218                 smsg->mtype = RMRRM_REQ_TABLE;
219                 smsg->sub_id = 0;
220                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
221                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
222                 smsg->len = strlen( smsg->payload ) + 1;
223
224                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
225                 if( (state = smsg->state) != RMR_OK ) {
226                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
227                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
228                         ctx->rtg_whid = -1;
229                 }
230
231                 rmr_free_msg( smsg );
232         }
233
234         return state;
235 }
236
237 /*
238         Send an ack to the route table manager for a table ID that we are
239         processing.      State is 1 for OK, and 0 for failed. Reason might
240         be populated if we know why there was a failure.
241
242         Context should be the PRIVATE context that we use for messages
243         to route manger and NOT the user's context.
244
245         If a message buffere is passed we use that and use return to sender
246         assuming that this might be a response to a call and that is needed
247         to send back to the proper calling thread. If msg is nil, we allocate
248         and use it.
249 */
250 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
251         int             use_rts = 1;
252         int             payload_size = 1024;
253
254         if( ctx == NULL || ctx->rtg_whid < 0 ) {
255                 return;
256         }
257
258         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
259                 return;
260         }
261
262         if( smsg != NULL ) {
263                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
264         } else {
265                 use_rts = 0;
266                 smsg = rmr_alloc_msg( ctx, payload_size );
267         }
268
269         if( smsg != NULL ) {
270                 smsg->mtype = RMRRM_TABLE_STATE;
271                 smsg->sub_id = -1;
272                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
273                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
274
275                 smsg->len = strlen( smsg->payload ) + 1;
276
277                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
278                 if( use_rts ) {
279                         smsg = rmr_rts_msg( ctx, smsg );
280                 } else {
281                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
282                 }
283                 if( (state = smsg->state) != RMR_OK ) {
284                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
285                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
286                         ctx->rtg_whid = -1;
287                 }
288
289                 if( ! use_rts ) {
290                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
291                 }
292         }
293 }
294
295 // ------------------------------------------------------------------------------------------------
296 /*
297         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
298         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
299 */
300 static char* clip( char* buf ) {
301         char*   tok;
302
303         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
304                 buf++;
305         }
306
307         if( (tok = strchr( buf, '#' )) != NULL ) {
308                 if( tok == buf ) {
309                         return buf;                                     // just push back; leading comment sym handled there
310                 }
311
312                 if( isspace( *(tok-1) ) ) {
313                         *tok = 0;
314                 }
315         }
316
317         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
318         *(tok+1) = 0;
319
320         return buf;
321 }
322
323 /*
324         This accepts a pointer to a nil terminated string, and ensures that there is a
325         newline as the last character. If there is not, a new buffer is allocated and
326         the newline is added.  If a new buffer is allocated, the buffer passed in is
327         freed.  The function returns a pointer which the caller should use, and must
328         free.  In the event of an error, a nil pointer is returned.
329 */
330 static char* ensure_nlterm( char* buf ) {
331         char*   nb = NULL;
332         int             len = 1;
333
334         nb = buf;
335         if( buf == NULL || (len = strlen( buf )) < 2 ) {
336                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
337                         *nb = '\n';
338                         *(nb+1) = 0;
339                 }
340         } else {
341                 if( buf[len-1] != '\n' ) {
342                         rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
343                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
344                                 memcpy( nb, buf, len );
345                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
346                                 *(nb+len+1) = 0;
347                         }
348
349                         free( buf );
350                 }
351         }
352
353         return nb;
354 }
355
356 /*
357         Given a message type create a route table entry and add to the hash keyed on the
358         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
359         is the number of group slots to allocate in the entry.
360 */
361 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
362         rtable_ent_t* rte;
363         rtable_ent_t* old_rte;          // entry which was already in the table for the key
364
365         if( rt == NULL ) {
366                 return NULL;
367         }
368
369         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
370                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
371                 return NULL;
372         }
373         memset( rte, 0, sizeof( *rte ) );
374         rte->refs = 1;
375         rte->key = key;
376
377         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
378                 nrrgroups = 10;
379         }
380
381         if( nrrgroups ) {
382                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
383                         free( rte );
384                         return NULL;
385                 }
386                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
387         } else {
388                 rte->rrgroups = NULL;
389         }
390
391         rte->nrrgroups = nrrgroups;
392
393         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
394                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
395         }
396
397         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
398
399         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
400         return rte;
401 }
402
403 /*
404         This accepts partially parsed information from an rte or mse record sent by route manager or read from
405         a file such that:
406                 ts_field is the msg-type,sender field
407                 subid is the integer subscription id
408                 rr_field is the endpoint information for round robening message over
409
410         If all goes well, this will add an RTE to the table under construction.
411
412         The ts_field is checked to see if we should ingest this record. We ingest if one of
413         these is true:
414                 there is no sender info (a generic entry for all)
415                 there is sender and our host:port matches one of the senders
416                 the sender info is an IP address that matches one of our IP addresses
417 */
418 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
419         rtable_ent_t*   rte;            // route table entry added
420         char*   tok;
421         int             ntoks;
422         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
423         char*   tokens[128];
424         char*   gtokens[64];
425         int             i;
426         int             ngtoks;                         // number of tokens in the group list
427         int             grp;                            // index into group list
428
429         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
430         rr_field = clip( rr_field );
431
432         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
433                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
434                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
435
436                 key = build_rt_key( subid, atoi( ts_field ) );
437
438                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
439
440                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
441                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
442                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
443                         }
444                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
445                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
446
447                         for( grp = 0; grp < ngtoks; grp++ ) {
448                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
449                                         for( i = 0; i < ntoks; i++ ) {
450                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
451                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
452                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
453                                                 }
454                                         }
455                                 }
456                         }
457                 }
458         } else {
459                 if( DEBUG || (vlevel > 2) ) {
460                         rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
461                 }
462         }
463 }
464
465 /*
466         Trash_entry takes a partially parsed record from the input and
467         will delete the entry if the sender,mtype matches us or it's a
468         generic mtype. The refernce in the new table is removed and the
469         refcounter for the actual rte is decreased. If that ref count is
470         0 then the memory is freed (handled byh the del_rte call).
471 */
472 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
473         rtable_ent_t*   rte;            // route table entry to be 'deleted'
474         char*   tok;
475         int             ntoks;
476         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
477         char*   tokens[128];
478
479         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
480                 return;
481         }
482
483         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
484
485         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
486                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
487                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
488
489                 key = build_rt_key( subid, atoi( ts_field ) );
490                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
491                 if( rte != NULL ) {
492                         if( DEBUG || (vlevel > 1) ) {
493                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
494                         }
495                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
496                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
497                 } else {
498                         if( DEBUG || (vlevel > 1) ) {
499                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
500                         }
501                 }
502         } else {
503                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
504         }
505 }
506
507 /*
508         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
509         the 'owner' which should be the dns name or IP address of an enpoint
510         the meid_list is a space separated list of me IDs
511
512         This function assumes the caller has vetted the pointers as needed.
513
514         For each meid in the list, an entry is pushed into the hash which references the owner
515         endpoint such that when the meid is used to route a message it references the endpoint
516         to send messages to.
517 */
518 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
519         char*   tok;
520         int             ntoks;
521         char*   tokens[128];
522         int             i;
523         int             state;
524         endpoint_t*     ep;                                             // endpoint struct for the owner
525
526         owner = clip( owner );                          // ditch extra whitespace and trailing comments
527         meid_list = clip( meid_list );
528
529         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
530         for( i = 0; i < ntoks; i++ ) {
531                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
532                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
533                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
534                 } else {
535                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
536                 }
537         }
538 }
539
540 /*
541         Given the tokens from an mme_del, delete the listed meid entries from the new
542         table. The list is a space separated list of meids.
543
544         The meids in the hash reference endpoints which are never deleted and so
545         the only thing that we need to do here is to remove the meid from the hash.
546
547         This function assumes the caller has vetted the pointers as needed.
548 */
549 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
550         char*   tok;
551         int             ntoks;
552         char*   tokens[128];
553         int             i;
554
555         if( rtab->hash == NULL ) {
556                 return;
557         }
558
559         meid_list = clip( meid_list );
560
561         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
562         for( i = 0; i < ntoks; i++ ) {
563                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
564                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
565         }
566 }
567
568 /*
569         Parse a partially parsed meid record. Tokens[0] should be one of:
570                 meid_map, mme_ar, mme_del.
571
572         pctx is the private context needed to return an ack/nack using the provided
573         message buffer with the route managers address info.
574 */
575 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
576         char wbuf[1024];
577
578         if( tokens == NULL || ntoks < 1 ) {
579                 return;                                                 // silent but should never happen
580         }
581
582         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
583                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
584                 return;
585         }
586
587         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
588                 tokens[1] = clip( tokens[1] );
589                 if( *(tokens[1]) == 's' ) {
590                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
591                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
592                                 uta_rt_drop( ctx->new_rtable );
593                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as end never made it
594                         }
595
596                         if( ctx->table_id != NULL ) {
597                                 free( ctx->table_id );
598                         }
599                         if( ntoks >2 ) {
600                                 ctx->table_id = strdup( clip( tokens[2] ) );
601                         } else {
602                                 ctx->table_id = NULL;
603                         }
604                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
605                         ctx->new_rtable->mupdates = 0;
606                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
607                 } else {
608                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
609                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
610                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
611                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
612                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
613                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
614                                                 uta_rt_drop( ctx->new_rtable );
615                                                 ctx->new_rtable = NULL;
616                                                 return;
617                                         }
618
619                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
620                                 }
621
622                                 if( ctx->new_rtable ) {
623                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
624                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
625                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
626                                         ctx->new_rtable = NULL;
627                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
628                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
629
630                                         if( vlevel > 0 ) {
631                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
632                                                 rt_stats( ctx->old_rtable );
633                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
634                                                 rt_stats( ctx->rtable );
635                                         }
636                                 } else {
637                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
638                                         ctx->new_rtable = NULL;
639                                 }
640                         }
641                 }
642
643                 return;
644         }
645
646         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
647                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
648                 return;
649         }
650
651         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
652                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
653                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
654                         return;
655                 }
656                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
657                 ctx->new_rtable->mupdates++;
658         }
659
660         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
661                 if( ntoks < 2 ) {
662                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
663                         return;
664                 }
665                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
666                 ctx->new_rtable->mupdates++;
667         }
668 }
669
670 /*
671         Parse a single record recevied from the route table generator, or read
672         from a static route table file.  Start records cause a new table to
673         be started (if a partial table was received it is discarded. Table
674         entry records are added to the currenly 'in progress' table, and an
675         end record causes the in progress table to be finalised and the
676         currently active table is replaced.
677
678         The updated table will be activated when the *|end record is encountered.
679         However, to allow for a "double" update, where both the meid map and the
680         route table must be updated at the same time, the end indication on a
681         route table (new or update) may specifiy "hold" which indicates that meid
682         map entries are to follow and the updated route table should be held as
683         pending until the end of the meid map is received and validated.
684
685         CAUTION:  we are assuming that there is a single route/meid map generator
686                 and as such only one type of update is received at a time; in other
687                 words, the sender cannot mix update records and if there is more than
688                 one sender process they must synchronise to avoid issues.
689
690
691         For a RT update, we expect:
692                 newrt | start | <table-id>
693                 newrt | end | <count>
694                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
695                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
696                 mse| <mtype>[,sender] | <sub-id> | %meid
697
698
699         For a meid map update we expect:
700                 meid_map | start | <table-id>
701                 meid_map | end | <count> | <md5-hash>
702                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
703                 mme_del | <meid0> <meid1>...<meidn>
704
705
706         The pctx is our private context that must be used to send acks/status
707         messages back to the route manager.  The regular ctx is the ctx that
708         the user has been given and thus that's where we have to hang the route
709         table we're working with.
710
711         If mbuf is given, and we need to ack, then we ack using the mbuf and a
712         return to sender call (allows route manager to use wh_call() to send
713         an update and rts is required to get that back to the right thread).
714         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
715         will be used.
716 */
717 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
718         int i;
719         int ntoks;                                                      // number of tokens found in something
720         int ngtoks;
721         int     grp;                                                    // group number
722         rtable_ent_t*   rte;                            // route table entry added
723         char*   tokens[128];
724         char*   tok;                                            // pointer into a token or string
725         char    wbuf[1024];
726
727         if( ! buf ) {
728                 return;
729         }
730
731         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
732                 buf++;
733         }
734         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
735         *(tok+1) = 0;
736
737         memset( tokens, 0, sizeof( tokens ) );
738         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
739                 tokens[0] = clip( tokens[0] );
740                 switch( *(tokens[0]) ) {
741                         case 0:                                                                                                 // ignore blanks
742                                 // fallthrough
743                         case '#':                                                                                               // and comment lines
744                                 break;
745
746                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
747                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
748                                         break;
749                                 }
750
751                                 if( ntoks < 3 ) {
752                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
753                                         break;
754                                 }
755
756                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
757                                 ctx->new_rtable->updates++;
758                                 break;
759
760                         case 'n':                                                                                               // newrt|{start|end}
761                                 tokens[1] = clip( tokens[1] );
762                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
763                                         if( ntoks >2 ) {
764                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
765                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
766                                                                 ctx->new_rtable->updates, tokens[2] );
767                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
768                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
769                                                         uta_rt_drop( ctx->new_rtable );
770                                                         ctx->new_rtable = NULL;
771                                                         break;
772                                                 }
773                                         }
774
775                                         if( ctx->new_rtable ) {
776                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
777                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
778                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
779                                                 ctx->new_rtable = NULL;
780                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
781
782                                                 if( vlevel > 0 ) {
783                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
784                                                         rt_stats( ctx->old_rtable );
785                                                         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
786                                                         rt_stats( ctx->rtable );
787                                                 }
788
789                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
790                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
791                                         } else {
792                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
793                                                 ctx->new_rtable = NULL;
794                                         }
795                                 } else {                                                                                                                        // start a new table.
796                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
797                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
798
799                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
800                                                 uta_rt_drop( ctx->new_rtable );
801                                         }
802
803                                         if( ctx->table_id != NULL ) {
804                                                 free( ctx->table_id );
805                                         }
806                                         if( ntoks >2 ) {
807                                                 ctx->table_id = strdup( clip( tokens[2] ) );
808                                         } else {
809                                                 ctx->table_id = NULL;
810                                         }
811
812                                         ctx->new_rtable = NULL;
813                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
814                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
815                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
816                                 }
817                                 break;
818
819                         case 'm':                                                                       // mse entry or one of the meid_ records
820                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
821                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
822                                                 break;
823                                         }
824
825                                         if( ntoks < 4 ) {
826                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
827                                                 break;
828                                         }
829
830                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
831                                         ctx->new_rtable->updates++;
832                                 } else {
833                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
834                                 }
835                                 break;
836
837                         case 'r':                                       // assume rt entry
838                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
839                                         break;
840                                 }
841
842                                 ctx->new_rtable->updates++;
843                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
844                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
845                                 } else {
846                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
847                                 }
848                                 break;
849
850                         case 'u':                                                                                               // update current table, not a total replacement
851                                 tokens[1] = clip( tokens[1] );
852                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
853                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
854                                                 break;
855                                         }
856
857                                         if( ntoks >2 ) {
858                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
859                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
860                                                                 ctx->new_rtable->updates, tokens[2] );
861                                                         uta_rt_drop( ctx->new_rtable );
862                                                         ctx->new_rtable = NULL;
863                                                         break;
864                                                 }
865                                         }
866
867                                         if( ctx->new_rtable ) {
868                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
869                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
870                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
871                                                 ctx->new_rtable = NULL;
872                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
873
874                                                 if( vlevel > 0 ) {
875                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
876                                                         rt_stats( ctx->old_rtable );
877                                                         rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
878                                                         rt_stats( ctx->rtable );
879                                                 }
880                                         } else {
881                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
882                                                 ctx->new_rtable = NULL;
883                                         }
884                                 } else {                                                                                        // start a new table.
885                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
886                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
887                                                 uta_rt_drop( ctx->new_rtable );
888                                         }
889
890                                         if( ntoks >2 ) {
891                                                 if( ctx->table_id != NULL ) {
892                                                         free( ctx->table_id );
893                                                 }
894                                                 ctx->table_id = strdup( clip( tokens[2] ) );
895                                         }
896
897                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
898                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
899                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
900                                 }
901                                 break;
902
903                         default:
904                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
905                                 break;
906                 }
907         }
908 }
909
910 /*
911         This function attempts to open a static route table in order to create a 'seed'
912         table during initialisation.  The environment variable RMR_SEED_RT is expected
913         to contain the necessary path to the file. If missing, or if the file is empty,
914         no route table will be available until one is received from the generator.
915
916         This function is probably most useful for testing situations, or extreme
917         cases where the routes are static.
918 */
919 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
920         int             i;
921         char*   fname;
922         char*   fbuf;                           // buffer with file contents
923         char*   rec;                            // start of the record
924         char*   eor;                            // end of the record
925         int             rcount = 0;                     // record count for debug
926
927         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
928                 return;
929         }
930
931         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
932                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
933                 return;
934         }
935
936         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
937         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
938                 if( *eor == '\r' ) {
939                         *eor = '\n';                                                            // will look like a blank line which is ok
940                 }
941         }
942
943         rec = fbuf;
944         while( rec && *rec ) {
945                 rcount++;
946                 if( (eor = strchr( rec, '\n' )) != NULL ) {
947                         *eor = 0;
948                 } else {
949                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
950                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
951                         free( fbuf );
952                         return;
953                 }
954
955                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
956
957                 rec = eor+1;
958         }
959
960         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
961         free( fbuf );
962 }
963
964 /*
965         Callback driven for each named thing in a symtab. We collect the pointers to those
966         things for later use (cloning).
967 */
968 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
969         thing_list_t*   tl;
970
971         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
972                 return;
973         }
974
975         if( thing == NULL ) {
976                 return;
977         }
978
979         tl->names[tl->nused] = name;                    // the name/key
980         tl->things[tl->nused++] = thing;                // save a reference to the thing
981 }
982
983 /*
984         Called to delete a route table entry struct. We delete the array of endpoint
985         pointers, but NOT the endpoints referenced as those are referenced from
986         multiple entries.
987
988         Route table entries can be concurrently referenced by multiple symtabs, so
989         the actual delete happens only if decrementing the rte's ref count takes it
990         to 0. Thus, it is safe to call this function across a symtab when cleaning up
991         the symtab, or overlaying an entry.
992
993         This function uses ONLY the pointer to the rte (thing) and ignores the other
994         information that symtab foreach function passes (st, entry, and data) which
995         means that it _can_ safetly be used outside of the foreach setting. If
996         the function is changed to depend on any of these three, then a stand-alone
997         rte_cleanup() function should be added and referenced by this, and refererences
998         to this outside of the foreach world should be changed.
999 */
1000 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1001         rtable_ent_t*   rte;
1002         int i;
1003
1004         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1005                 return;
1006         }
1007
1008         rte->refs--;
1009         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1010                 return;
1011         }
1012
1013         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1014                 for( i = 0; i < rte->nrrgroups; i++ ) {
1015                         if( rte->rrgroups[i] ) {
1016                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1017                         }
1018                 }
1019
1020                 free( rte->rrgroups );
1021         }
1022
1023         free( rte );                                                                                    // finally, drop the potato
1024 }
1025
1026 /*
1027         Read an entire file into a buffer. We assume for route table files
1028         they will be smallish and so this won't be a problem.
1029         Returns a pointer to the buffer, or nil. Caller must free.
1030         Terminates the buffer with a nil character for string processing.
1031
1032         If we cannot stat the file, we assume it's empty or missing and return
1033         an empty buffer, as opposed to a nil, so the caller can generate defaults
1034         or error if an empty/missing file isn't tolerated.
1035 */
1036 static char* uta_fib( char* fname ) {
1037         struct stat     stats;
1038         off_t           fsize = 8192;   // size of the file
1039         off_t           nread;                  // number of bytes read
1040         int                     fd;
1041         char*           buf;                    // input buffer
1042
1043         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1044                 if( fstat( fd, &stats ) >= 0 ) {
1045                         if( stats.st_size <= 0 ) {                                      // empty file
1046                                 close( fd );
1047                                 fd = -1;
1048                         } else {
1049                                 fsize = stats.st_size;                                          // stat ok, save the file size
1050                         }
1051                 } else {
1052                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1053                 }
1054         }
1055
1056         if( fd < 0 ) {                                                                                  // didn't open or empty
1057                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1058                         return NULL;
1059                 }
1060
1061                 *buf = 0;
1062                 return buf;
1063         }
1064
1065         // add a size limit check here
1066
1067         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1068                 close( fd );
1069                 errno = ENOMEM;
1070                 return NULL;
1071         }
1072
1073         nread = read( fd, buf, fsize );
1074         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1075                 free( buf );
1076                 errno = EFBIG;                                                                                  // likely too much to handle
1077                 close( fd );
1078                 return NULL;
1079         }
1080
1081         buf[nread] = 0;
1082
1083         close( fd );
1084         return buf;
1085 }
1086
1087 /*
1088         Create and initialise a route table; Returns a pointer to the table struct.
1089 */
1090 static route_table_t* uta_rt_init( ) {
1091         route_table_t*  rt;
1092
1093         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1094                 return NULL;
1095         }
1096         memset( rt, 0, sizeof( *rt ) );
1097
1098         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1099                 free( rt );
1100                 return NULL;
1101         }
1102
1103         return rt;
1104 }
1105
1106 /*
1107         Clones one of the spaces in the given table.
1108         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1109         Space is the space in the old table to copy. Space 0 uses an integer key and
1110         references rte structs. All other spaces use a string key and reference endpoints.
1111 */
1112 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1113         endpoint_t*             ep;             // an endpoint
1114         rtable_ent_t*   rte;    // a route table entry
1115         void*   sst;                    // source symtab
1116         void*   nst;                    // new symtab
1117         thing_list_t things;    // things from the space to copy
1118         int             i;
1119         int             free_on_err = 0;
1120
1121         if( nrt == NULL ) {                             // make a new table if needed
1122                 free_on_err = 1;
1123                 nrt = uta_rt_init();
1124         }
1125
1126         if( srt == NULL ) {             // source was nil, just give back the new table
1127                 return nrt;
1128         }
1129
1130         things.nalloc = 2048;
1131         things.nused = 0;
1132         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1133         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1134         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1135         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1136         if( things.things == NULL ) {
1137                 if( free_on_err ) {
1138                         free( nrt->hash );
1139                         free( nrt );
1140                         nrt = NULL;
1141                 }
1142
1143                 return nrt;
1144         }
1145
1146         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1147         nst = nrt->hash;
1148
1149         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1150
1151         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1152         for( i = 0; i < things.nused; i++ ) {
1153                 if( space ) {                                                                                           // string key, epoint reference
1154                         ep = (endpoint_t *) things.things[i];
1155                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1156                 } else {
1157                         rte = (rtable_ent_t *) things.things[i];
1158                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1159                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1160                 }
1161         }
1162
1163         free( things.things );
1164         free( (void *) things.names );
1165         return nrt;
1166 }
1167
1168 /*
1169         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1170         The endpoint and meid entries in the hash must be preserved.
1171 */
1172 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1173         endpoint_t*             ep;                             // an endpoint
1174         rtable_ent_t*   rte;                    // a route table entry
1175         route_table_t*  nrt = NULL;             // new route table
1176         int i;
1177
1178         if( srt == NULL ) {
1179                 return uta_rt_init();           // no source to clone, just return an empty table
1180         }
1181
1182         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1183         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1184
1185         return nrt;
1186 }
1187
1188 /*
1189         Creates a new route table and then clones  _all_ of the given route table (references
1190         both endpoints AND the route table entries. Needed to support a partial update where
1191         some route table entries will not be deleted if not explicitly in the update and when
1192         we are adding/replacing meid references.
1193 */
1194 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1195         endpoint_t*             ep;                             // an endpoint
1196         rtable_ent_t*   rte;                    // a route table entry
1197         route_table_t*  nrt = NULL;             // new route table
1198         int i;
1199
1200         if( srt == NULL ) {
1201                 return uta_rt_init();           // no source to clone, just return an empty table
1202         }
1203
1204         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1205         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1206         rt_clone_space( srt, nrt, RT_ME_SPACE );
1207
1208         return nrt;
1209 }
1210
1211 /*
1212         Given a name, find the endpoint struct in the provided route table.
1213 */
1214 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1215
1216         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1217                 return NULL;
1218         }
1219
1220         return rmr_sym_get( rt->hash, ep_name, 1 );
1221 }
1222
1223 /*
1224         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1225 */
1226 static void uta_rt_drop( route_table_t* rt ) {
1227         if( rt == NULL ) {
1228                 return;
1229         }
1230
1231         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1232         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1233         free( rt );
1234 }
1235
1236 /*
1237         Look up and return the pointer to the endpoint stuct matching the given name.
1238         If not in the hash, a new endpoint is created, added to the hash. Should always
1239         return a pointer.
1240 */
1241 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1242         endpoint_t*     ep;
1243
1244         if( !rt || !ep_name || ! *ep_name ) {
1245                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1246                 errno = EINVAL;
1247                 return NULL;
1248         }
1249
1250         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1251                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1252                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1253                         errno = ENOMEM;
1254                         return NULL;
1255                 }
1256
1257                 ep->notify = 1;                                                         // show notification on first connection failure
1258                 ep->open = 0;                                                           // not connected
1259                 ep->addr = uta_h2ip( ep_name );
1260                 ep->name = strdup( ep_name );
1261                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1262                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1263
1264                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1265         }
1266
1267         return ep;
1268 }
1269
1270
1271 /*
1272         Given a session id and message type build a key that can be used to look up the rte in the route
1273         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1274 */
1275 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1276         uint64_t key;
1277
1278         if( sub_id == UNSET_SUBID ) {
1279                 key = 0xffffffff00000000 | mtype;
1280         } else {
1281                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1282         }
1283
1284         return key;
1285 }
1286
1287 /*
1288         Given a route table and meid string, find the owner (if known). Returns a pointer to
1289         the endpoint struct or nil.
1290 */
1291 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
1292         endpoint_t* ep;         // the ep we found in the hash
1293
1294         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1295                 return NULL;
1296         }
1297
1298         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1299 }
1300
1301 #endif