Add the wormhole call function
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48
49 #include <RIC_message_types.h>          // needed for route manager messages
50
51
52 /*
53         Passed to a symtab foreach callback to construct a list of pointers from
54         a current symtab.
55 */
56 typedef struct thing_list {
57         int nalloc;
58         int nused;
59         void** things;
60         const char** names;
61 } thing_list_t;
62
63 // ---- debugging/testing -------------------------------------------------------------------------
64
65 /*
66         Dump some stats for an endpoint in the RT. This is generally called to
67         verify endpoints after a table load/change.
68 */
69 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
70         int*    counter;
71         endpoint_t* ep;
72
73         if( (ep = (endpoint_t *) thing) == NULL ) {
74                 return;
75         }
76
77         if( (counter = (int *) vcounter) != NULL ) {
78                 (*counter)++;
79         }
80
81         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
82 }
83
84 /*
85         Called to count meid entries in the table. The meid points to an 'owning' endpoint
86         so we can list what we find
87 */
88 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
89         int*    counter;
90         endpoint_t* ep;
91
92         if( (ep = (endpoint_t *) thing) == NULL ) {
93                 return;
94         }
95
96         if( (counter = (int *) vcounter) != NULL ) {
97                 (*counter)++;
98         }
99
100         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
101 }
102
103 /*
104         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
105         the 'source' information and is added to each message.
106 */
107 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
108         endpoint_t* ep;
109         char*   id;
110
111         if( (ep = (endpoint_t *) thing) == NULL ) {
112                 return;
113         }
114
115         if( (id = (char *) vid) == NULL ) {
116                 id = "missing";
117         }
118
119         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
120                 (long long) time( NULL ),
121                 id,
122                 ep->name,
123                 ep->open,
124                 ep->scounts[EPSC_GOOD],
125                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
126                 ep->scounts[EPSC_FAIL],
127                 ep->scounts[EPSC_TRANS]   );
128 }
129
130 /*
131         Dump stats for a route entry in the table.
132 */
133 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
134         int*    counter;
135         rtable_ent_t* rte;                      // thing is really an rte
136         int             mtype;
137         int             sid;
138
139         if( (rte = (rtable_ent_t *) thing) == NULL ) {
140                 return;
141         }
142
143         if( (counter = (int *) vcounter) != NULL ) {
144                 (*counter)++;
145         }
146
147         mtype = rte->key & 0xffff;
148         sid = (int) (rte->key >> 32);
149
150         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
151 }
152
153 /*
154         Given a route table, cause some stats to be spit out.
155 */
156 static void  rt_stats( route_table_t* rt ) {
157         int* counter;
158
159         if( rt == NULL ) {
160                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
161                 return;
162         }
163
164         counter = (int *) malloc( sizeof( int ) );
165         *counter = 0;
166         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
167         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
168         rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
169         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
170
171         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
172         *counter = 0;
173         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
174         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
175
176         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
177         *counter = 0;
178         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
179         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
180
181         free( counter );
182 }
183
184 /*
185         Given a route table, cause endpoint counters to be written to stderr. The id
186         parm is written as the "source" in the output.
187 */
188 static void  rt_epcounts( route_table_t* rt, char* id ) {
189         if( rt == NULL ) {
190                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
191                 return;
192         }
193
194         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
195 }
196
197 // ------------ route manager communication -------------------------------------------------
198 /*
199         Send a request for a table update to the route manager. Updates come in
200         async, so send and go.
201
202         pctx is the private context for the thread; ctx is the application context
203         that we need to be able to send the application ID in case rt mgr needs to
204         use it to idenfity us.
205
206         Returns 0 if we were not able to send a request.
207 */
208 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
209         rmr_mbuf_t*     smsg;
210         int     state = 0;
211
212         if( ctx->rtg_whid < 0 ) {
213                 return state;
214         }
215
216         smsg = rmr_alloc_msg( pctx, 1024 );
217         if( smsg != NULL ) {
218                 smsg->mtype = RMRRM_REQ_TABLE;
219                 smsg->sub_id = 0;
220                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
221                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
222                 smsg->len = strlen( smsg->payload ) + 1;
223         
224                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
225                 if( (state = smsg->state) != RMR_OK ) {
226                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
227                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
228                         ctx->rtg_whid = -1;
229                 }
230
231                 rmr_free_msg( smsg );
232         }
233
234         return state;
235 }
236
237 /*
238         Send an ack to the route table manager for a table ID that we are
239         processing.      State is 1 for OK, and 0 for failed. Reason might 
240         be populated if we know why there was a failure.
241
242         Context should be the PRIVATE context that we use for messages
243         to route manger and NOT the user's context.
244
245         If a message buffere is passed we use that and use return to sender
246         assuming that this might be a response to a call and that is needed
247         to send back to the proper calling thread. If msg is nil, we allocate
248         and use it.
249 */
250 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
251         int             use_rts = 1;
252         int             payload_size = 1024;
253         
254         if( ctx == NULL || ctx->rtg_whid < 0 ) {
255                 return;
256         }
257
258         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
259                 return;
260         }
261
262         if( smsg != NULL ) {
263                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
264         } else {
265                 use_rts = 0;
266                 smsg = rmr_alloc_msg( ctx, payload_size );
267         }
268
269         if( smsg != NULL ) {
270                 smsg->mtype = RMRRM_TABLE_STATE;
271                 smsg->sub_id = -1;
272                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
273                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
274
275                 smsg->len = strlen( smsg->payload ) + 1;
276         
277                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
278                 if( use_rts ) {
279                         smsg = rmr_rts_msg( ctx, smsg );
280                 } else {
281                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
282                 }
283                 if( (state = smsg->state) != RMR_OK ) {
284                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
285                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
286                         ctx->rtg_whid = -1;
287                 }
288
289                 if( ! use_rts ) {
290                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
291                 }
292         }
293 }
294
295 // ------------------------------------------------------------------------------------------------
296 /*
297         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
298         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
299 */
300 static char* clip( char* buf ) {
301         char*   tok;
302
303         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
304                 buf++;
305         }
306
307         if( (tok = strchr( buf, '#' )) != NULL ) {
308                 if( tok == buf ) {
309                         return buf;                                     // just push back; leading comment sym handled there
310                 }
311
312                 if( isspace( *(tok-1) ) ) {
313                         *tok = 0;
314                 }
315         }
316
317         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
318         *(tok+1) = 0;
319
320         return buf;
321 }
322
323 /*
324         This accepts a pointer to a nil terminated string, and ensures that there is a
325         newline as the last character. If there is not, a new buffer is allocated and
326         the newline is added.  If a new buffer is allocated, the buffer passed in is
327         freed.  The function returns a pointer which the caller should use, and must
328         free.  In the event of an error, a nil pointer is returned.
329 */
330 static char* ensure_nlterm( char* buf ) {
331         char*   nb = NULL;
332         int             len = 1;
333
334         nb = buf;
335         if( buf == NULL || (len = strlen( buf )) < 2 ) {
336                 if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
337                         *nb = '\n';
338                         *(nb+1) = 0;
339                 }
340         } else {
341                 if( buf[len-1] != '\n' ) {
342                         rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
343                         if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
344                                 memcpy( nb, buf, len );
345                                 *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
346                                 *(nb+len+1) = 0;
347                         }
348
349                         free( buf );
350                 }
351         }
352
353         return nb;
354 }
355
356 /*
357         Given a message type create a route table entry and add to the hash keyed on the
358         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
359         is the number of group slots to allocate in the entry.
360 */
361 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
362         rtable_ent_t* rte;
363         rtable_ent_t* old_rte;          // entry which was already in the table for the key
364
365         if( rt == NULL ) {
366                 return NULL;
367         }
368
369         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
370                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
371                 return NULL;
372         }
373         memset( rte, 0, sizeof( *rte ) );
374         rte->refs = 1;
375         rte->key = key;
376
377         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
378                 nrrgroups = 10;
379         }
380
381         if( nrrgroups ) {
382                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
383                         free( rte );
384                         return NULL;
385                 }
386                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
387         } else {
388                 rte->rrgroups = NULL;
389         }
390
391         rte->nrrgroups = nrrgroups;
392
393         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
394                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
395         }
396
397         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
398
399         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
400         return rte;
401 }
402
403 /*
404         This accepts partially parsed information from an rte or mse record sent by route manager or read from
405         a file such that:
406                 ts_field is the msg-type,sender field
407                 subid is the integer subscription id
408                 rr_field is the endpoint information for round robening message over
409
410         If all goes well, this will add an RTE to the table under construction.
411
412         The ts_field is checked to see if we should ingest this record. We ingest if one of
413         these is true:
414                 there is no sender info (a generic entry for all)
415                 there is sender and our host:port matches one of the senders
416                 the sender info is an IP address that matches one of our IP addresses
417 */
418 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
419         rtable_ent_t*   rte;            // route table entry added
420         char*   tok;
421         int             ntoks;
422         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
423         char*   tokens[128];
424         char*   gtokens[64];
425         int             i;
426         int             ngtoks;                         // number of tokens in the group list
427         int             grp;                            // index into group list
428
429         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
430         rr_field = clip( rr_field );
431
432         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
433                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
434                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
435
436                 key = build_rt_key( subid, atoi( ts_field ) );
437
438                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
439
440                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
441                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
442                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
443                         }
444                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
445                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
446
447                         for( grp = 0; grp < ngtoks; grp++ ) {
448                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
449                                         for( i = 0; i < ntoks; i++ ) {
450                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
451                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
452                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
453                                                 }
454                                         }
455                                 }
456                         }
457                 }
458         } else {
459                 if( DEBUG || (vlevel > 2) ) {
460                         rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
461                 }
462         }
463 }
464
465 /*
466         Trash_entry takes a partially parsed record from the input and
467         will delete the entry if the sender,mtype matches us or it's a
468         generic mtype. The refernce in the new table is removed and the
469         refcounter for the actual rte is decreased. If that ref count is
470         0 then the memory is freed (handled byh the del_rte call).
471 */
472 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
473         rtable_ent_t*   rte;            // route table entry to be 'deleted'
474         char*   tok;
475         int             ntoks;
476         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
477         char*   tokens[128];
478
479         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
480                 return;
481         }
482
483         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
484
485         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
486                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
487                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
488
489                 key = build_rt_key( subid, atoi( ts_field ) );
490                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
491                 if( rte != NULL ) {
492                         if( DEBUG || (vlevel > 1) ) {
493                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
494                         }
495                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
496                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
497                 } else {
498                         if( DEBUG || (vlevel > 1) ) {
499                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
500                         }
501                 }
502         } else {
503                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
504         }
505 }
506
507 /*
508         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
509         the 'owner' which should be the dns name or IP address of an enpoint
510         the meid_list is a space separated list of me IDs
511
512         This function assumes the caller has vetted the pointers as needed.
513
514         For each meid in the list, an entry is pushed into the hash which references the owner
515         endpoint such that when the meid is used to route a message it references the endpoint
516         to send messages to.
517 */
518 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
519         char*   tok;
520         int             ntoks;
521         char*   tokens[128];
522         int             i;
523         int             state;
524         endpoint_t*     ep;                                             // endpoint struct for the owner
525
526         owner = clip( owner );                          // ditch extra whitespace and trailing comments
527         meid_list = clip( meid_list );
528
529         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
530         for( i = 0; i < ntoks; i++ ) {
531                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
532                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
533                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
534                 } else {
535                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
536                 }
537         }
538 }
539
540 /*
541         Given the tokens from an mme_del, delete the listed meid entries from the new
542         table. The list is a space separated list of meids.
543
544         The meids in the hash reference endpoints which are never deleted and so
545         the only thing that we need to do here is to remove the meid from the hash.
546
547         This function assumes the caller has vetted the pointers as needed.
548 */
549 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
550         char*   tok;
551         int             ntoks;
552         char*   tokens[128];
553         int             i;
554
555         if( rtab->hash == NULL ) {
556                 return;
557         }
558
559         meid_list = clip( meid_list );
560
561         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
562         for( i = 0; i < ntoks; i++ ) {
563                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
564                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
565         }
566 }
567
568 /*
569         Parse a partially parsed meid record. Tokens[0] should be one of:
570                 meid_map, mme_ar, mme_del.
571 */
572 static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
573         if( tokens == NULL || ntoks < 1 ) {
574                 return;                                                 // silent but should never happen
575         }
576
577         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
578                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
579                 return;
580         }
581
582         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
583                 tokens[1] = clip( tokens[1] );
584                 if( *(tokens[1]) == 's' ) {
585                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
586                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
587                                 uta_rt_drop( ctx->new_rtable );
588                         }
589
590                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
591                         ctx->new_rtable->mupdates = 0;
592                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
593                 } else {
594                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
595                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
596                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
597                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
598                                                 uta_rt_drop( ctx->new_rtable );
599                                                 ctx->new_rtable = NULL;
600                                                 return;
601                                         }
602
603                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
604                                 }
605
606                                 if( ctx->new_rtable ) {
607                                         uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
608                                         ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
609                                         ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
610                                         ctx->new_rtable = NULL;
611                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
612
613                                         if( vlevel > 0 ) {
614                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
615                                                 rt_stats( ctx->old_rtable );
616                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
617                                                 rt_stats( ctx->rtable );
618                                         }
619                                 } else {
620                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
621                                         ctx->new_rtable = NULL;
622                                 }
623                         }
624                 }
625
626                 return;
627         }       
628
629         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
630                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
631                 return;
632         }
633
634         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
635                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
636                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
637                         return;
638                 }
639                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
640                 ctx->new_rtable->mupdates++;
641         }
642
643         if( strcmp( tokens[0], "mme_del" ) == 0 ) {
644                 if( ntoks < 2 ) {
645                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
646                         return;
647                 }
648                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
649                 ctx->new_rtable->mupdates++;
650         }
651 }
652
653 /*
654         Parse a single record recevied from the route table generator, or read
655         from a static route table file.  Start records cause a new table to
656         be started (if a partial table was received it is discarded. Table
657         entry records are added to the currenly 'in progress' table, and an
658         end record causes the in progress table to be finalised and the
659         currently active table is replaced.
660
661         The updated table will be activated when the *|end record is encountered.
662         However, to allow for a "double" update, where both the meid map and the
663         route table must be updated at the same time, the end indication on a
664         route table (new or update) may specifiy "hold" which indicates that meid
665         map entries are to follow and the updated route table should be held as
666         pending until the end of the meid map is received and validated.
667
668         CAUTION:  we are assuming that there is a single route/meid map generator
669                 and as such only one type of update is received at a time; in other
670                 words, the sender cannot mix update records and if there is more than
671                 one sender process they must synchronise to avoid issues.
672
673
674         For a RT update, we expect:
675                 newrt | start | <table-id>
676                 newrt | end | <count>
677                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
678                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
679                 mse| <mtype>[,sender] | <sub-id> | %meid
680
681
682         For a meid map update we expect:
683                 meid_map | start | <table-id>
684                 meid_map | end | <count> | <md5-hash>
685                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
686                 mme_del | <meid0> <meid1>...<meidn>
687
688
689         The pctx is our private context that must be used to send acks/status
690         messages back to the route manager.  The regular ctx is the ctx that
691         the user has been given and thus that's where we have to hang the route
692         table we're working with.
693
694         If mbuf is given, and we need to ack, then we ack using the mbuf and a
695         return to sender call (allows route manager to use wh_call() to send
696         an update and rts is required to get that back to the right thread).
697         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
698         will be used.
699 */
700 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
701         int i;
702         int ntoks;                                                      // number of tokens found in something
703         int ngtoks;
704         int     grp;                                                    // group number
705         rtable_ent_t*   rte;                            // route table entry added
706         char*   tokens[128];
707         char*   tok;                                            // pointer into a token or string
708         char    wbuf[1024];
709
710         if( ! buf ) {
711                 return;
712         }
713
714         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
715                 buf++;
716         }
717         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
718         *(tok+1) = 0;
719
720         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
721                 tokens[0] = clip( tokens[0] );
722                 switch( *(tokens[0]) ) {
723                         case 0:                                                                                                 // ignore blanks
724                                 // fallthrough
725                         case '#':                                                                                               // and comment lines
726                                 break;
727
728                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
729                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
730                                         break;
731                                 }
732
733                                 if( ntoks < 3 ) {
734                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
735                                         break;
736                                 }
737
738                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
739                                 ctx->new_rtable->updates++;
740                                 break;
741
742                         case 'n':                                                                                               // newrt|{start|end}
743                                 tokens[1] = clip( tokens[1] );
744                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
745                                         if( ntoks >2 ) {
746                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
747                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
748                                                                 ctx->new_rtable->updates, tokens[2] );
749                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
750                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
751                                                         uta_rt_drop( ctx->new_rtable );
752                                                         ctx->new_rtable = NULL;
753                                                         break;
754                                                 }
755                                         }
756
757                                         if( ctx->new_rtable ) {
758                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
759                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
760                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
761                                                 ctx->new_rtable = NULL;
762                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
763
764                                                 if( vlevel > 0 ) {
765                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
766                                                         rt_stats( ctx->old_rtable );
767                                                         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
768                                                         rt_stats( ctx->rtable );
769                                                 }
770
771                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
772                                         } else {
773                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
774                                                 ctx->new_rtable = NULL;
775                                         }
776                                 } else {                                                                                                                        // start a new table.
777                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
778                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
779
780                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
781                                                 uta_rt_drop( ctx->new_rtable );
782                                         }
783
784                                         if( ctx->table_id != NULL ) {
785                                                 free( ctx->table_id );
786                                         }
787                                         if( ntoks >2 ) {
788                                                 ctx->table_id = strdup( clip( tokens[2] ) );
789                                         } else {
790                                                 ctx->table_id = NULL;
791                                         }
792
793                                         ctx->new_rtable = NULL;
794                                         ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
795                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
796                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
797                                 }
798                                 break;
799
800                         case 'm':                                                               // mse entry or one of the meid_ records
801                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
802                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
803                                                 break;
804                                         }
805
806                                         if( ntoks < 4 ) {
807                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
808                                                 break;
809                                         }
810
811                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
812                                         ctx->new_rtable->updates++;
813                                 } else {
814                                         meid_parser( ctx, tokens, ntoks, vlevel );
815                                 }
816                                 break;
817
818                         case 'r':                                       // assume rt entry
819                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
820                                         break;
821                                 }
822
823                                 ctx->new_rtable->updates++;
824                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
825                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
826                                 } else {
827                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
828                                 }
829                                 break;
830
831                         case 'u':                                                                                               // update current table, not a total replacement
832                                 tokens[1] = clip( tokens[1] );
833                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
834                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
835                                                 break;
836                                         }
837
838                                         if( ntoks >2 ) {
839                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
840                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
841                                                                 ctx->new_rtable->updates, tokens[2] );
842                                                         uta_rt_drop( ctx->new_rtable );
843                                                         ctx->new_rtable = NULL;
844                                                         break;
845                                                 }
846                                         }
847
848                                         if( ctx->new_rtable ) {
849                                                 uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
850                                                 ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
851                                                 ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
852                                                 ctx->new_rtable = NULL;
853                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
854
855                                                 if( vlevel > 0 ) {
856                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
857                                                         rt_stats( ctx->old_rtable );
858                                                         rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
859                                                         rt_stats( ctx->rtable );
860                                                 }
861                                         } else {
862                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
863                                                 ctx->new_rtable = NULL;
864                                         }
865                                 } else {                                                                                        // start a new table.
866                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
867                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
868                                                 uta_rt_drop( ctx->new_rtable );
869                                         }
870
871                                         if( ntoks >2 ) {
872                                                 if( ctx->table_id != NULL ) {
873                                                         free( ctx->table_id );
874                                                 }
875                                                 ctx->table_id = strdup( clip( tokens[2] ) );
876                                         }
877
878                                         ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
879                                         ctx->new_rtable->updates = 0;                                           // init count of updates received
880                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
881                                 }
882                                 break;
883
884                         default:
885                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
886                                 break;
887                 }
888         }
889 }
890
891 /*
892         This function attempts to open a static route table in order to create a 'seed'
893         table during initialisation.  The environment variable RMR_SEED_RT is expected
894         to contain the necessary path to the file. If missing, or if the file is empty,
895         no route table will be available until one is received from the generator.
896
897         This function is probably most useful for testing situations, or extreme
898         cases where the routes are static.
899 */
900 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
901         int             i;
902         char*   fname;
903         char*   fbuf;                           // buffer with file contents
904         char*   rec;                            // start of the record
905         char*   eor;                            // end of the record
906         int             rcount = 0;                     // record count for debug
907
908         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
909                 return;
910         }
911
912         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
913                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
914                 return;
915         }
916
917         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
918         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
919                 if( *eor == '\r' ) {
920                         *eor = '\n';                                                            // will look like a blank line which is ok
921                 }
922         }
923
924         rec = fbuf;
925         while( rec && *rec ) {
926                 rcount++;
927                 if( (eor = strchr( rec, '\n' )) != NULL ) {
928                         *eor = 0;
929                 } else {
930                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
931                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
932                         free( fbuf );
933                         return;
934                 }
935
936                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
937
938                 rec = eor+1;
939         }
940
941         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
942         free( fbuf );
943 }
944
945 /*
946         Callback driven for each named thing in a symtab. We collect the pointers to those
947         things for later use (cloning).
948 */
949 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
950         thing_list_t*   tl;
951
952         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
953                 return;
954         }
955
956         if( thing == NULL ) {
957                 return;
958         }
959
960         tl->names[tl->nused] = name;                    // the name/key
961         tl->things[tl->nused++] = thing;                // save a reference to the thing
962 }
963
964 /*
965         Called to delete a route table entry struct. We delete the array of endpoint
966         pointers, but NOT the endpoints referenced as those are referenced from
967         multiple entries.
968
969         Route table entries can be concurrently referenced by multiple symtabs, so
970         the actual delete happens only if decrementing the rte's ref count takes it
971         to 0. Thus, it is safe to call this function across a symtab when cleaning up
972         the symtab, or overlaying an entry.
973
974         This function uses ONLY the pointer to the rte (thing) and ignores the other
975         information that symtab foreach function passes (st, entry, and data) which
976         means that it _can_ safetly be used outside of the foreach setting. If
977         the function is changed to depend on any of these three, then a stand-alone
978         rte_cleanup() function should be added and referenced by this, and refererences
979         to this outside of the foreach world should be changed.
980 */
981 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
982         rtable_ent_t*   rte;
983         int i;
984
985         if( (rte = (rtable_ent_t *) thing) == NULL ) {
986                 return;
987         }
988
989         rte->refs--;
990         if( rte->refs > 0 ) {                   // something still referencing, so it lives
991                 return;
992         }
993
994         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
995                 for( i = 0; i < rte->nrrgroups; i++ ) {
996                         if( rte->rrgroups[i] ) {
997                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
998                         }
999                 }
1000
1001                 free( rte->rrgroups );
1002         }
1003
1004         free( rte );                                                                                    // finally, drop the potato
1005 }
1006
1007 /*
1008         Read an entire file into a buffer. We assume for route table files
1009         they will be smallish and so this won't be a problem.
1010         Returns a pointer to the buffer, or nil. Caller must free.
1011         Terminates the buffer with a nil character for string processing.
1012
1013         If we cannot stat the file, we assume it's empty or missing and return
1014         an empty buffer, as opposed to a nil, so the caller can generate defaults
1015         or error if an empty/missing file isn't tolerated.
1016 */
1017 static char* uta_fib( char* fname ) {
1018         struct stat     stats;
1019         off_t           fsize = 8192;   // size of the file
1020         off_t           nread;                  // number of bytes read
1021         int                     fd;
1022         char*           buf;                    // input buffer
1023
1024         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1025                 if( fstat( fd, &stats ) >= 0 ) {
1026                         if( stats.st_size <= 0 ) {                                      // empty file
1027                                 close( fd );
1028                                 fd = -1;
1029                         } else {
1030                                 fsize = stats.st_size;                                          // stat ok, save the file size
1031                         }
1032                 } else {
1033                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1034                 }
1035         }
1036
1037         if( fd < 0 ) {                                                                                  // didn't open or empty
1038                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1039                         return NULL;
1040                 }
1041
1042                 *buf = 0;
1043                 return buf;
1044         }
1045
1046         // add a size limit check here
1047
1048         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1049                 close( fd );
1050                 errno = ENOMEM;
1051                 return NULL;
1052         }
1053
1054         nread = read( fd, buf, fsize );
1055         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1056                 free( buf );
1057                 errno = EFBIG;                                                                                  // likely too much to handle
1058                 close( fd );
1059                 return NULL;
1060         }
1061
1062         buf[nread] = 0;
1063
1064         close( fd );
1065         return buf;
1066 }
1067
1068 /*
1069         Create and initialise a route table; Returns a pointer to the table struct.
1070 */
1071 static route_table_t* uta_rt_init( ) {
1072         route_table_t*  rt;
1073
1074         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1075                 return NULL;
1076         }
1077
1078         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1079                 free( rt );
1080                 return NULL;
1081         }
1082
1083         return rt;
1084 }
1085
1086 /*
1087         Clones one of the spaces in the given table.
1088         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1089         Space is the space in the old table to copy. Space 0 uses an integer key and
1090         references rte structs. All other spaces use a string key and reference endpoints.
1091 */
1092 static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
1093         endpoint_t*             ep;             // an endpoint
1094         rtable_ent_t*   rte;    // a route table entry
1095         void*   sst;                    // source symtab
1096         void*   nst;                    // new symtab
1097         thing_list_t things;    // things from the space to copy
1098         int             i;
1099         int             free_on_err = 0;
1100
1101         if( nrt == NULL ) {                             // make a new table if needed
1102                 free_on_err = 1;
1103                 nrt = uta_rt_init();
1104         }
1105
1106         if( srt == NULL ) {             // source was nil, just give back the new table
1107                 return nrt;
1108         }
1109
1110         things.nalloc = 2048;
1111         things.nused = 0;
1112         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1113         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1114         if( things.things == NULL ) {
1115                 if( free_on_err ) {
1116                         free( nrt->hash );
1117                         free( nrt );
1118                         nrt = NULL;
1119                 }
1120
1121                 return nrt;
1122         }
1123
1124         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1125         nst = nrt->hash;
1126
1127         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1128
1129         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1130         for( i = 0; i < things.nused; i++ ) {
1131                 if( space ) {                                                                                           // string key, epoint reference
1132                         ep = (endpoint_t *) things.things[i];
1133                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1134                 } else {
1135                         rte = (rtable_ent_t *) things.things[i];
1136                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1137                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1138                 }
1139         }
1140
1141         free( things.things );
1142         free( (void *) things.names );
1143         return nrt;
1144 }
1145
1146 /*
1147         Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
1148         The endpoint and meid entries in the hash must be preserved.
1149 */
1150 static route_table_t* uta_rt_clone( route_table_t* srt ) {
1151         endpoint_t*             ep;                             // an endpoint
1152         rtable_ent_t*   rte;                    // a route table entry
1153         route_table_t*  nrt = NULL;             // new route table
1154         int i;
1155
1156         if( srt == NULL ) {
1157                 return uta_rt_init();           // no source to clone, just return an empty table
1158         }
1159
1160         nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
1161         rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
1162
1163         return nrt;
1164 }
1165
1166 /*
1167         Creates a new route table and then clones  _all_ of the given route table (references 
1168         both endpoints AND the route table entries. Needed to support a partial update where 
1169         some route table entries will not be deleted if not explicitly in the update and when 
1170         we are adding/replacing meid references.
1171 */
1172 static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
1173         endpoint_t*             ep;                             // an endpoint
1174         rtable_ent_t*   rte;                    // a route table entry
1175         route_table_t*  nrt = NULL;             // new route table
1176         int i;
1177
1178         if( srt == NULL ) {
1179                 return uta_rt_init();           // no source to clone, just return an empty table
1180         }
1181
1182         nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
1183         rt_clone_space( srt, nrt, RT_NAME_SPACE );
1184         rt_clone_space( srt, nrt, RT_ME_SPACE );
1185
1186         return nrt;
1187 }
1188
1189 /*
1190         Given a name, find the endpoint struct in the provided route table.
1191 */
1192 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1193
1194         if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
1195                 return NULL;
1196         }
1197
1198         return rmr_sym_get( rt->hash, ep_name, 1 );
1199 }
1200
1201 /*
1202         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1203 */
1204 static void uta_rt_drop( route_table_t* rt ) {
1205         if( rt == NULL ) {
1206                 return;
1207         }
1208
1209         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1210         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1211         free( rt );
1212 }
1213
1214 /*
1215         Look up and return the pointer to the endpoint stuct matching the given name.
1216         If not in the hash, a new endpoint is created, added to the hash. Should always
1217         return a pointer.
1218 */
1219 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1220         endpoint_t*     ep;
1221
1222         if( !rt || !ep_name || ! *ep_name ) {
1223                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1224                 errno = EINVAL;
1225                 return NULL;
1226         }
1227
1228         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1229                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1230                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1231                         errno = ENOMEM;
1232                         return NULL;
1233                 }
1234
1235                 ep->notify = 1;                                                         // show notification on first connection failure
1236                 ep->open = 0;                                                           // not connected
1237                 ep->addr = uta_h2ip( ep_name );
1238                 ep->name = strdup( ep_name );
1239                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1240                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1241
1242                 rmr_sym_put( rt->hash, ep_name, 1, ep );
1243         }
1244
1245         return ep;
1246 }
1247
1248
1249 /*
1250         Given a session id and message type build a key that can be used to look up the rte in the route
1251         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1252 */
1253 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1254         uint64_t key;
1255
1256         if( sub_id == UNSET_SUBID ) {
1257                 key = 0xffffffff00000000 | mtype;
1258         } else {
1259                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1260         }
1261
1262         return key;
1263 }
1264
1265 /*
1266         Given a route table and meid string, find the owner (if known). Returns a pointer to
1267         the endpoint struct or nil.
1268 */
1269 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
1270         endpoint_t* ep;         // the ep we found in the hash
1271
1272         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1273                 return NULL;
1274         }
1275
1276         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
1277 }
1278
1279 #endif