185dbb1f178a407ca816076c6d8ab266c68d7b4b
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49 #include <RIC_message_types.h>          // needed for route manager messages
50
51
52 /*
53         Passed to a symtab foreach callback to construct a list of pointers from
54         a current symtab.
55 */
56 typedef struct thing_list {
57         int nalloc;
58         int nused;
59         void** things;
60         const char** names;
61 } thing_list_t;
62
63 // ---- debugging/testing -------------------------------------------------------------------------
64
65 /*
66         Dump some stats for an endpoint in the RT. This is generally called to
67         verify endpoints after a table load/change.
68 */
69 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
70         int*    counter;
71         endpoint_t* ep;
72
73         if( (ep = (endpoint_t *) thing) == NULL ) {
74                 return;
75         }
76
77         if( (counter = (int *) vcounter) != NULL ) {
78                 (*counter)++;
79         }
80
81         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
82 }
83
84 /*
85         Called to count meid entries in the table. The meid points to an 'owning' endpoint
86         so we can list what we find
87 */
88 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
89         int*    counter;
90         endpoint_t* ep;
91
92         if( (ep = (endpoint_t *) thing) == NULL ) {
93                 return;
94         }
95
96         if( (counter = (int *) vcounter) != NULL ) {
97                 (*counter)++;
98         }
99
100         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
101 }
102
103 /*
104         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
105         the 'source' information and is added to each message.
106 */
107 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
108         endpoint_t* ep;
109         char*   id;
110
111         if( (ep = (endpoint_t *) thing) == NULL ) {
112                 return;
113         }
114
115         if( (id = (char *) vid) == NULL ) {
116                 id = "missing";
117         }
118
119         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
120                 (long long) time( NULL ),
121                 id,
122                 ep->name,
123                 ep->open,
124                 ep->scounts[EPSC_GOOD],
125                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
126                 ep->scounts[EPSC_FAIL],
127                 ep->scounts[EPSC_TRANS]   );
128 }
129
130 /*
131         Dump stats for a route entry in the table.
132 */
133 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
134         int*    counter;
135         rtable_ent_t* rte;                      // thing is really an rte
136         int             mtype;
137         int             sid;
138
139         if( (rte = (rtable_ent_t *) thing) == NULL ) {
140                 return;
141         }
142
143         if( (counter = (int *) vcounter) != NULL ) {
144                 (*counter)++;
145         }
146
147         mtype = rte->key & 0xffff;
148         sid = (int) (rte->key >> 32);
149
150         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
151 }
152
153 /*
154         Given a route table, cause some stats to be spit out.
155 */
156 static void  rt_stats( route_table_t* rt ) {
157         int* counter;
158
159         if( rt == NULL ) {
160                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
161                 return;
162         }
163
164         counter = (int *) malloc( sizeof( int ) );
165         *counter = 0;
166         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
167         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
168         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
169         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
170
171         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
172         *counter = 0;
173         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
174         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
175
176         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
177         *counter = 0;
178         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
179         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
180
181         free( counter );
182 }
183
184 /*
185         Given a route table, cause endpoint counters to be written to stderr. The id
186         parm is written as the "source" in the output.
187 */
188 static void  rt_epcounts( route_table_t* rt, char* id ) {
189         if( rt == NULL ) {
190                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
191                 return;
192         }
193
194         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
195 }
196
197 // ------------ route manager communication -------------------------------------------------
198 /*
199         Send a request for a table update to the route manager. Updates come in
200         async, so send and go.
201
202         pctx is the private context for the thread; ctx is the application context
203         that we need to be able to send the application ID in case rt mgr needs to
204         use it to idenfity us.
205
206         Returns 0 if we were not able to send a request.
207 */
208 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
209         rmr_mbuf_t*     smsg;
210         int     state = 0;
211
212         if( ctx->rtg_whid < 0 ) {
213                 return state;
214         }
215
216         smsg = rmr_alloc_msg( pctx, 1024 );
217         if( smsg != NULL ) {
218                 smsg->mtype = RMRRM_REQ_TABLE;
219                 smsg->sub_id = 0;
220                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
221                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
222                 smsg->len = strlen( smsg->payload ) + 1;
223         
224                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
225                 if( (state = smsg->state) != RMR_OK ) {
226                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
227                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
228                         ctx->rtg_whid = -1;
229                 }
230
231                 rmr_free_msg( smsg );
232         }
233
234         return state;
235 }
236
237 /*
238         Send an ack to the route table manager for a table ID that we are
239         processing.      State is 1 for OK, and 0 for failed. Reason might 
240         be populated if we know why there was a failure.
241
242         Context should be the PRIVATE context that we use for messages
243         to route manger and NOT the user's context.
244
245         If a message buffere is passed we use that and use return to sender
246         assuming that this might be a response to a call and that is needed
247         to send back to the proper calling thread. If msg is nil, we allocate
248         and use it.
249 */
250 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
251         int             use_rts = 1;
252         int             payload_size = 1024;
253         
254         if( ctx == NULL || ctx->rtg_whid < 0 ) {
255                 return;
256         }
257
258         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
259                 return;
260         }
261
262         if( smsg != NULL ) {
263                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
264         } else {
265                 use_rts = 0;
266                 smsg = rmr_alloc_msg( ctx, payload_size );
267         }
268
269         if( smsg != NULL ) {
270                 smsg->mtype = RMRRM_TABLE_STATE;
271                 smsg->sub_id = -1;
272                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
273                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
274
275                 smsg->len = strlen( smsg->payload ) + 1;
276         
277                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
278                 if( use_rts ) {
279                         smsg = rmr_rts_msg( ctx, smsg );
280                 } else {
281                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
282                 }
283                 if( (state = smsg->state) != RMR_OK ) {
284                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
285                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
286                         ctx->rtg_whid = -1;
287                 }
288
289                 if( ! use_rts ) {
290                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
291                 }
292         }
293 }
294
295 // ------------------------------------------------------------------------------------------------
296 /*
297         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
298         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
299 */
300 static char* clip( char* buf ) {
301         char*   tok;
302
303         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
304                 buf++;
305         }
306
307         if( (tok = strchr( buf, '#' )) != NULL ) {
308                 if( tok == buf ) {
309                         return buf;                                     // just push back; leading comment sym handled there
310                 }
311
312                 if( isspace( *(tok-1) ) ) {
313                         *tok = 0;
314                 }
315         }
316
317         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
318         *(tok+1) = 0;
319
320         return buf;
321 }
322
323 /*
324         This accepts a pointer to a nil terminated string, and ensures that there is a
325         newline as the last character. If there is not, a new buffer is allocated and
326         the newline is added.  If a new buffer is allocated, the buffer passed in is
327         freed.  The function returns a pointer which the caller should use, and must
328         free.  In the event of an error, a nil pointer is returned.
329 */
330 static char* ensure_nlterm( char* buf ) {
331         char*   nb = NULL;
332         int             len = 1;
333
334         nb = buf;
335         if( buf == NULL || (len = strlen( buf )) < 2 ) {
336                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
337                         *nb = '\n';
338                         *(nb+1) = 0;
339                 }
340         } else {
341                 if( buf[len-1] != '\n' ) {
342                         rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
343                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
344                                 memcpy( nb, buf, len );
345                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
346                                 *(nb+len+1) = 0;
347                         }
348
349                         free( buf );
350                 }
351         }
352
353         return nb;
354 }
355
356 /*
357         Given a message type create a route table entry and add to the hash keyed on the
358         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
359         is the number of group slots to allocate in the entry.
360 */
361 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
362         rtable_ent_t* rte;
363         rtable_ent_t* old_rte;          // entry which was already in the table for the key
364
365         if( rt == NULL ) {
366                 return NULL;
367         }
368
369         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
370                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
371                 return NULL;
372         }
373         memset( rte, 0, sizeof( *rte ) );
374         rte->refs = 1;
375         rte->key = key;
376
377         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
378                 nrrgroups = 10;
379         }
380
381         if( nrrgroups ) {
382                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
383                         free( rte );
384                         return NULL;
385                 }
386                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
387         } else {
388                 rte->rrgroups = NULL;
389         }
390
391         rte->nrrgroups = nrrgroups;
392
393         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
394                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
395         }
396
397         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
398
399         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
400         return rte;
401 }
402
403 /*
404         This accepts partially parsed information from an rte or mse record sent by route manager or read from
405         a file such that:
406                 ts_field is the msg-type,sender field
407                 subid is the integer subscription id
408                 rr_field is the endpoint information for round robening message over
409
410         If all goes well, this will add an RTE to the table under construction.
411
412         The ts_field is checked to see if we should ingest this record. We ingest if one of
413         these is true:
414                 there is no sender info (a generic entry for all)
415                 there is sender and our host:port matches one of the senders
416                 the sender info is an IP address that matches one of our IP addresses
417 */
418 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
419         rtable_ent_t*   rte;            // route table entry added
420         char*   tok;
421         int             ntoks;
422         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
423         char*   tokens[128];
424         char*   gtokens[64];
425         int             i;
426         int             ngtoks;                         // number of tokens in the group list
427         int             grp;                            // index into group list
428
429         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
430         rr_field = clip( rr_field );
431
432         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
433                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
434                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
435
436                 key = build_rt_key( subid, atoi( ts_field ) );
437
438                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
439
440                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
441                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
442                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
443                         }
444                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
445                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
446
447                         for( grp = 0; grp < ngtoks; grp++ ) {
448                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
449                                         for( i = 0; i < ntoks; i++ ) {
450                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
451                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
452                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
453                                                 }
454                                         }
455                                 }
456                         }
457                 }
458         } else {
459                 if( DEBUG || (vlevel > 2) ) {
460                         rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
461                 }
462         }
463 }
464
465 /*
466         Trash_entry takes a partially parsed record from the input and
467         will delete the entry if the sender,mtype matches us or it's a
468         generic mtype. The refernce in the new table is removed and the
469         refcounter for the actual rte is decreased. If that ref count is
470         0 then the memory is freed (handled byh the del_rte call).
471 */
472 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
473         rtable_ent_t*   rte;            // route table entry to be 'deleted'
474         char*   tok;
475         int             ntoks;
476         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
477         char*   tokens[128];
478
479         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
480                 return;
481         }
482
483         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
484
485         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
486                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
487                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
488
489                 key = build_rt_key( subid, atoi( ts_field ) );
490                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
491                 if( rte != NULL ) {
492                         if( DEBUG || (vlevel > 1) ) {
493                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
494                         }
495                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
496                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
497                 } else {
498                         if( DEBUG || (vlevel > 1) ) {
499                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
500                         }
501                 }
502         } else {
503                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
504         }
505 }
506
507 /*
508         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
509         the 'owner' which should be the dns name or IP address of an enpoint
510         the meid_list is a space separated list of me IDs
511
512         This function assumes the caller has vetted the pointers as needed.
513
514         For each meid in the list, an entry is pushed into the hash which references the owner
515         endpoint such that when the meid is used to route a message it references the endpoint
516         to send messages to.
517 */
518 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
519         char*   tok;
520         int             ntoks;
521         char*   tokens[128];
522         int             i;
523         int             state;
524         endpoint_t*     ep;                                             // endpoint struct for the owner
525
526         owner = clip( owner );                          // ditch extra whitespace and trailing comments
527         meid_list = clip( meid_list );
528
529         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
530         for( i = 0; i < ntoks; i++ ) {
531                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
532                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
533                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
534                 } else {
535                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
536                 }
537         }
538 }
539
540 /*
541         Given the tokens from an mme_del, delete the listed meid entries from the new
542         table. The list is a space separated list of meids.
543
544         The meids in the hash reference endpoints which are never deleted and so
545         the only thing that we need to do here is to remove the meid from the hash.
546
547         This function assumes the caller has vetted the pointers as needed.
548 */
549 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
550         char*   tok;
551         int             ntoks;
552         char*   tokens[128];
553         int             i;
554
555         if( rtab->hash == NULL ) {
556                 return;
557         }
558
559         meid_list = clip( meid_list );
560
561         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
562         for( i = 0; i < ntoks; i++ ) {
563                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
564                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
565         }
566 }
567
568 /*
569         Parse a partially parsed meid record. Tokens[0] should be one of:
570                 meid_map, mme_ar, mme_del.
571
572         pctx is the private context needed to return an ack/nack using the provided
573         message buffer with the route managers address info.
574 */
575 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
576         char wbuf[1024];
577
578         if( tokens == NULL || ntoks < 1 ) {
579                 return;                                                 // silent but should never happen
580         }
581
582         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
583                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
584                 return;
585         }
586
587         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
588                 tokens[1] = clip( tokens[1] );
589                 if( *(tokens[1]) == 's' ) {
590                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
591                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
592                                 uta_rt_drop( ctx->new_rtable );
593                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as end never made it
594                         }
595
596                         if( ctx->table_id != NULL ) {
597                                 free( ctx->table_id );
598                         }
599                         if( ntoks >2 ) {
600                                 ctx->table_id = strdup( clip( tokens[2] ) );
601                         } else {
602                                 ctx->table_id = NULL;
603                         }
604                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
605                         ctx->new_rtable->mupdates = 0;
606                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
607                 } else {
608                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
609                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
610                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
611                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
612                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
613                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
614                                                 uta_rt_drop( ctx->new_rtable );
615                                                 ctx->new_rtable = NULL;
616                                                 return;
617                                         }
618
619                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
620                                 }
621
622                                 if( ctx->new_rtable ) {
623                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
624                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
625                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
626                                         ctx->new_rtable = NULL;
627                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
628                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
629
630                                         if( vlevel > 0 ) {
631                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
632                                                 rt_stats( ctx->old_rtable );
633                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
634                                                 rt_stats( ctx->rtable );
635                                         }
636                                 } else {
637                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
638                                         ctx->new_rtable = NULL;
639                                 }
640                         }
641                 }
642
643                 return;
644         }       
645
646         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
647                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
648                 return;
649         }
650
651         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
652                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
653                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
654                         return;
655                 }
656                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
657                 ctx->new_rtable->mupdates++;
658         }
659
660         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
661                 if( ntoks < 2 ) {
662                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
663                         return;
664                 }
665                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
666                 ctx->new_rtable->mupdates++;
667         }
668 }
669
670 /*
671         Parse a single record recevied from the route table generator, or read
672         from a static route table file.  Start records cause a new table to
673         be started (if a partial table was received it is discarded. Table
674         entry records are added to the currenly 'in progress' table, and an
675         end record causes the in progress table to be finalised and the
676         currently active table is replaced.
677
678         The updated table will be activated when the *|end record is encountered.
679         However, to allow for a "double" update, where both the meid map and the
680         route table must be updated at the same time, the end indication on a
681         route table (new or update) may specifiy "hold" which indicates that meid
682         map entries are to follow and the updated route table should be held as
683         pending until the end of the meid map is received and validated.
684
685         CAUTION:  we are assuming that there is a single route/meid map generator
686                 and as such only one type of update is received at a time; in other
687                 words, the sender cannot mix update records and if there is more than
688                 one sender process they must synchronise to avoid issues.
689
690
691         For a RT update, we expect:
692                 newrt | start | <table-id>
693                 newrt | end | <count>
694                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
695                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
696                 mse| <mtype>[,sender] | <sub-id> | %meid
697
698
699         For a meid map update we expect:
700                 meid_map | start | <table-id>
701                 meid_map | end | <count> | <md5-hash>
702                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
703                 mme_del | <meid0> <meid1>...<meidn>
704
705
706         The pctx is our private context that must be used to send acks/status
707         messages back to the route manager.  The regular ctx is the ctx that
708         the user has been given and thus that's where we have to hang the route
709         table we're working with.
710
711         If mbuf is given, and we need to ack, then we ack using the mbuf and a
712         return to sender call (allows route manager to use wh_call() to send
713         an update and rts is required to get that back to the right thread).
714         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
715         will be used.
716 */
717 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
718         int i;
719         int ntoks;                                                      // number of tokens found in something
720         int ngtoks;
721         int     grp;                                                    // group number
722         rtable_ent_t*   rte;                            // route table entry added
723         char*   tokens[128];
724         char*   tok;                                            // pointer into a token or string
725         char    wbuf[1024];
726
727         if( ! buf ) {
728                 return;
729         }
730
731         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
732                 buf++;
733         }
734         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
735         *(tok+1) = 0;
736
737         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
738                 tokens[0] = clip( tokens[0] );
739                 switch( *(tokens[0]) ) {
740                         case 0:                                                                                                 // ignore blanks
741                                 // fallthrough
742                         case '#':                                                                                               // and comment lines
743                                 break;
744
745                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
746                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
747                                         break;
748                                 }
749
750                                 if( ntoks < 3 ) {
751                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
752                                         break;
753                                 }
754
755                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
756                                 ctx->new_rtable->updates++;
757                                 break;
758
759                         case 'n':                                                                                               // newrt|{start|end}
760                                 tokens[1] = clip( tokens[1] );
761                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
762                                         if( ntoks >2 ) {
763                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
764                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
765                                                                 ctx->new_rtable->updates, tokens[2] );
766                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
767                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
768                                                         uta_rt_drop( ctx->new_rtable );
769                                                         ctx->new_rtable = NULL;
770                                                         break;
771                                                 }
772                                         }
773
774                                         if( ctx->new_rtable ) {
775                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
776                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
777                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
778                                                 ctx->new_rtable = NULL;
779                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
780
781                                                 if( vlevel > 0 ) {
782                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
783                                                         rt_stats( ctx->old_rtable );
784                                                         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
785                                                         rt_stats( ctx->rtable );
786                                                 }
787
788                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
789                                         } else {
790                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
791                                                 ctx->new_rtable = NULL;
792                                         }
793                                 } else {                                                                                                                        // start a new table.
794                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
795                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
796
797                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
798                                                 uta_rt_drop( ctx->new_rtable );
799                                         }
800
801                                         if( ctx->table_id != NULL ) {
802                                                 free( ctx->table_id );
803                                         }
804                                         if( ntoks >2 ) {
805                                                 ctx->table_id = strdup( clip( tokens[2] ) );
806                                         } else {
807                                                 ctx->table_id = NULL;
808                                         }
809
810                                         ctx->new_rtable = NULL;
811                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
812                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
813                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
814                                 }
815                                 break;
816
817                         case 'm':                                                                       // mse entry or one of the meid_ records
818                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
819                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
820                                                 break;
821                                         }
822
823                                         if( ntoks < 4 ) {
824                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
825                                                 break;
826                                         }
827
828                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
829                                         ctx->new_rtable->updates++;
830                                 } else {
831                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
832                                 }
833                                 break;
834
835                         case 'r':                                       // assume rt entry
836                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
837                                         break;
838                                 }
839
840                                 ctx->new_rtable->updates++;
841                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
842                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
843                                 } else {
844                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
845                                 }
846                                 break;
847
848                         case 'u':                                                                                               // update current table, not a total replacement
849                                 tokens[1] = clip( tokens[1] );
850                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
851                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
852                                                 break;
853                                         }
854
855                                         if( ntoks >2 ) {
856                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
857                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
858                                                                 ctx->new_rtable->updates, tokens[2] );
859                                                         uta_rt_drop( ctx->new_rtable );
860                                                         ctx->new_rtable = NULL;
861                                                         break;
862                                                 }
863                                         }
864
865                                         if( ctx->new_rtable ) {
866                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
867                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
868                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
869                                                 ctx->new_rtable = NULL;
870                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
871
872                                                 if( vlevel > 0 ) {
873                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
874                                                         rt_stats( ctx->old_rtable );
875                                                         rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
876                                                         rt_stats( ctx->rtable );
877                                                 }
878                                         } else {
879                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
880                                                 ctx->new_rtable = NULL;
881                                         }
882                                 } else {                                                                                        // start a new table.
883                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
884                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
885                                                 uta_rt_drop( ctx->new_rtable );
886                                         }
887
888                                         if( ntoks >2 ) {
889                                                 if( ctx->table_id != NULL ) {
890                                                         free( ctx->table_id );
891                                                 }
892                                                 ctx->table_id = strdup( clip( tokens[2] ) );
893                                         }
894
895                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
896                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
897                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
898                                 }
899                                 break;
900
901                         default:
902                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
903                                 break;
904                 }
905         }
906 }
907
908 /*
909         This function attempts to open a static route table in order to create a 'seed'
910         table during initialisation.  The environment variable RMR_SEED_RT is expected
911         to contain the necessary path to the file. If missing, or if the file is empty,
912         no route table will be available until one is received from the generator.
913
914         This function is probably most useful for testing situations, or extreme
915         cases where the routes are static.
916 */
917 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
918         int             i;
919         char*   fname;
920         char*   fbuf;                           // buffer with file contents
921         char*   rec;                            // start of the record
922         char*   eor;                            // end of the record
923         int             rcount = 0;                     // record count for debug
924
925         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
926                 return;
927         }
928
929         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
930                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
931                 return;
932         }
933
934         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
935         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
936                 if( *eor == '\r' ) {
937                         *eor = '\n';                                                            // will look like a blank line which is ok
938                 }
939         }
940
941         rec = fbuf;
942         while( rec && *rec ) {
943                 rcount++;
944                 if( (eor = strchr( rec, '\n' )) != NULL ) {
945                         *eor = 0;
946                 } else {
947                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
948                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
949                         free( fbuf );
950                         return;
951                 }
952
953                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
954
955                 rec = eor+1;
956         }
957
958         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
959         free( fbuf );
960 }
961
962 /*
963         Callback driven for each named thing in a symtab. We collect the pointers to those
964         things for later use (cloning).
965 */
966 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
967         thing_list_t*   tl;
968
969         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
970                 return;
971         }
972
973         if( thing == NULL ) {
974                 return;
975         }
976
977         tl->names[tl->nused] = name;                    // the name/key
978         tl->things[tl->nused++] = thing;                // save a reference to the thing
979 }
980
981 /*
982         Called to delete a route table entry struct. We delete the array of endpoint
983         pointers, but NOT the endpoints referenced as those are referenced from
984         multiple entries.
985
986         Route table entries can be concurrently referenced by multiple symtabs, so
987         the actual delete happens only if decrementing the rte's ref count takes it
988         to 0. Thus, it is safe to call this function across a symtab when cleaning up
989         the symtab, or overlaying an entry.
990
991         This function uses ONLY the pointer to the rte (thing) and ignores the other
992         information that symtab foreach function passes (st, entry, and data) which
993         means that it _can_ safetly be used outside of the foreach setting. If
994         the function is changed to depend on any of these three, then a stand-alone
995         rte_cleanup() function should be added and referenced by this, and refererences
996         to this outside of the foreach world should be changed.
997 */
998 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
999         rtable_ent_t*   rte;
1000         int i;
1001
1002         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1003                 return;
1004         }
1005
1006         rte->refs--;
1007         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1008                 return;
1009         }
1010
1011         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1012                 for( i = 0; i < rte->nrrgroups; i++ ) {
1013                         if( rte->rrgroups[i] ) {
1014                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1015                         }
1016                 }
1017
1018                 free( rte->rrgroups );
1019         }
1020
1021         free( rte );                                                                                    // finally, drop the potato
1022 }
1023
1024 /*
1025         Read an entire file into a buffer. We assume for route table files
1026         they will be smallish and so this won't be a problem.
1027         Returns a pointer to the buffer, or nil. Caller must free.
1028         Terminates the buffer with a nil character for string processing.
1029
1030         If we cannot stat the file, we assume it's empty or missing and return
1031         an empty buffer, as opposed to a nil, so the caller can generate defaults
1032         or error if an empty/missing file isn't tolerated.
1033 */
1034 static char* uta_fib( char* fname ) {
1035         struct stat     stats;
1036         off_t           fsize = 8192;   // size of the file
1037         off_t           nread;                  // number of bytes read
1038         int                     fd;
1039         char*           buf;                    // input buffer
1040
1041         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1042                 if( fstat( fd, &stats ) >= 0 ) {
1043                         if( stats.st_size <= 0 ) {                                      // empty file
1044                                 close( fd );
1045                                 fd = -1;
1046                         } else {
1047                                 fsize = stats.st_size;                                          // stat ok, save the file size
1048                         }
1049                 } else {
1050                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1051                 }
1052         }
1053
1054         if( fd < 0 ) {                                                                                  // didn't open or empty
1055                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1056                         return NULL;
1057                 }
1058
1059                 *buf = 0;
1060                 return buf;
1061         }
1062
1063         // add a size limit check here
1064
1065         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1066                 close( fd );
1067                 errno = ENOMEM;
1068                 return NULL;
1069         }
1070
1071         nread = read( fd, buf, fsize );
1072         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1073                 free( buf );
1074                 errno = EFBIG;                                                                                  // likely too much to handle
1075                 close( fd );
1076                 return NULL;
1077         }
1078
1079         buf[nread] = 0;
1080
1081         close( fd );
1082         return buf;
1083 }
1084
1085 /*
1086         Create and initialise a route table; Returns a pointer to the table struct.
1087 */
1088 static route_table_t* uta_rt_init( ) {
1089         route_table_t*  rt;
1090
1091         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1092                 return NULL;
1093         }
1094
1095         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1096                 free( rt );
1097                 return NULL;
1098         }
1099
1100         return rt;
1101 }
1102
1103 /*
1104         Clones one of the spaces in the given table.
1105         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1106         Space is the space in the old table to copy. Space 0 uses an integer key and
1107         references rte structs. All other spaces use a string key and reference endpoints.
1108 */
1109 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1110         endpoint_t*             ep;             // an endpoint
1111         rtable_ent_t*   rte;    // a route table entry
1112         void*   sst;                    // source symtab
1113         void*   nst;                    // new symtab
1114         thing_list_t things;    // things from the space to copy
1115         int             i;
1116         int             free_on_err = 0;
1117
1118         if( nrt == NULL ) {                             // make a new table if needed
1119                 free_on_err = 1;
1120                 nrt = uta_rt_init();
1121         }
1122
1123         if( srt == NULL ) {             // source was nil, just give back the new table
1124                 return nrt;
1125         }
1126
1127         things.nalloc = 2048;
1128         things.nused = 0;
1129         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1130         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1131         if( things.things == NULL ) {
1132                 if( free_on_err ) {
1133                         free( nrt->hash );
1134                         free( nrt );
1135                         nrt = NULL;
1136                 }
1137
1138                 return nrt;
1139         }
1140
1141         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1142         nst = nrt->hash;
1143
1144         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1145
1146         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1147         for( i = 0; i < things.nused; i++ ) {
1148                 if( space ) {                                                                                           // string key, epoint reference
1149                         ep = (endpoint_t *) things.things[i];
1150                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1151                 } else {
1152                         rte = (rtable_ent_t *) things.things[i];
1153                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1154                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1155                 }
1156         }
1157
1158         free( things.things );
1159         free( (void *) things.names );
1160         return nrt;
1161 }
1162
1163 /*
1164         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1165         The endpoint and meid entries in the hash must be preserved.
1166 */
1167 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1168         endpoint_t*             ep;                             // an endpoint
1169         rtable_ent_t*   rte;                    // a route table entry
1170         route_table_t*  nrt = NULL;             // new route table
1171         int i;
1172
1173         if( srt == NULL ) {
1174                 return uta_rt_init();           // no source to clone, just return an empty table
1175         }
1176
1177         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1178         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1179
1180         return nrt;
1181 }
1182
1183 /*
1184         Creates a new route table and then clones  _all_ of the given route table (references 
1185         both endpoints AND the route table entries. Needed to support a partial update where 
1186         some route table entries will not be deleted if not explicitly in the update and when 
1187         we are adding/replacing meid references.
1188 */
1189 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1190         endpoint_t*             ep;                             // an endpoint
1191         rtable_ent_t*   rte;                    // a route table entry
1192         route_table_t*  nrt = NULL;             // new route table
1193         int i;
1194
1195         if( srt == NULL ) {
1196                 return uta_rt_init();           // no source to clone, just return an empty table
1197         }
1198
1199         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1200         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1201         rt_clone_space( srt, nrt, RT_ME_SPACE );
1202
1203         return nrt;
1204 }
1205
1206 /*
1207         Given a name, find the endpoint struct in the provided route table.
1208 */
1209 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1210
1211         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1212                 return NULL;
1213         }
1214
1215         return rmr_sym_get( rt->hash, ep_name, 1 );
1216 }
1217
1218 /*
1219         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1220 */
1221 static void uta_rt_drop( route_table_t* rt ) {
1222         if( rt == NULL ) {
1223                 return;
1224         }
1225
1226         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1227         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1228         free( rt );
1229 }
1230
1231 /*
1232         Look up and return the pointer to the endpoint stuct matching the given name.
1233         If not in the hash, a new endpoint is created, added to the hash. Should always
1234         return a pointer.
1235 */
1236 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1237         endpoint_t*     ep;
1238
1239         if( !rt || !ep_name || ! *ep_name ) {
1240                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1241                 errno = EINVAL;
1242                 return NULL;
1243         }
1244
1245         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1246                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1247                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1248                         errno = ENOMEM;
1249                         return NULL;
1250                 }
1251
1252                 ep->notify = 1;                                                         // show notification on first connection failure
1253                 ep->open = 0;                                                           // not connected
1254                 ep->addr = uta_h2ip( ep_name );
1255                 ep->name = strdup( ep_name );
1256                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1257                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1258
1259                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1260         }
1261
1262         return ep;
1263 }
1264
1265
1266 /*
1267         Given a session id and message type build a key that can be used to look up the rte in the route
1268         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1269 */
1270 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1271         uint64_t key;
1272
1273         if( sub_id == UNSET_SUBID ) {
1274                 key = 0xffffffff00000000 | mtype;
1275         } else {
1276                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1277         }
1278
1279         return key;
1280 }
1281
1282 /*
1283         Given a route table and meid string, find the owner (if known). Returns a pointer to
1284         the endpoint struct or nil.
1285 */
1286 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
1287         endpoint_t* ep;         // the ep we found in the hash
1288
1289         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1290                 return NULL;
1291         }
1292
1293         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
1294 }
1295
1296 #endif