532a6bf63904b334028f88a300546ea105693444
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License") ;
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_generic_static.c
23         Abstract:       These are route table functions which are not specific to the
24                                 underlying protocol.  rtable_static, and rtable_nng_static
25                                 have transport provider specific code.
26
27                                 This file must be included before the nng/nano specific file as
28                                 it defines types.
29
30         Author:         E. Scott Daniels
31         Date:           5  February 2019
32 */
33
34 #ifndef rt_generic_static_c
35 #define rt_generic_static_c
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <netdb.h>
40 #include <errno.h>
41 #include <string.h>
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <unistd.h>
47 #include <netdb.h>
48 #include <pthread.h>
49
50 #include <RIC_message_types.h>          // needed for route manager messages
51
52 #define ALL 1
53 #define SOME 0
54
55 /*
56         Passed to a symtab foreach callback to construct a list of pointers from
57         a current symtab.
58 */
59 typedef struct thing_list {
60         int nalloc;
61         int nused;
62         void** things;
63         const char** names;
64 } thing_list_t;
65
66 // ---- debugging/testing -------------------------------------------------------------------------
67
68 /*
69         Dump some stats for an endpoint in the RT. This is generally called to
70         verify endpoints after a table load/change.
71
72         This is called by the for-each mechanism of the symtab and the prototype is
73         fixe; we don't really use some of the parms, but have dummy references to
74         keep sonar from complaining.
75 */
76 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
77         int*    counter;
78         endpoint_t* ep;
79
80         if( (ep = (endpoint_t *) thing) == NULL ) {
81                 return;
82         }
83
84         if( (counter = (int *) vcounter) != NULL ) {
85                 (*counter)++;
86         } else {
87                 rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
88         }
89
90         rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
91 }
92
93 /*
94         Called to count meid entries in the table. The meid points to an 'owning' endpoint
95         so we can list what we find
96
97         See note in ep_stats about dummy refs.
98 */
99 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
100         int*    counter;
101         endpoint_t* ep;
102
103         if( (ep = (endpoint_t *) thing) == NULL ) {
104                 return;
105         }
106
107         if( (counter = (int *) vcounter) != NULL ) {
108                 (*counter)++;
109         } else {
110                 rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
111         }
112
113         rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
114 }
115
116 /*
117         Dump counts for an endpoint in the RT. The vid parm is assumed to point to
118         the 'source' information and is added to each message.
119
120         See note above about dummy references.
121 */
122 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
123         endpoint_t* ep;
124         char*   id;
125
126         if( (ep = (endpoint_t *) thing) == NULL ) {
127                 rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
128                 return;
129         }
130
131         if( (id = (char *) vid) == NULL ) {
132                 id = "missing";
133         }
134
135         rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
136                 (long long) time( NULL ),
137                 id,
138                 ep->name,
139                 ep->open,
140                 ep->scounts[EPSC_GOOD],
141                 ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
142                 ep->scounts[EPSC_FAIL],
143                 ep->scounts[EPSC_TRANS]   );
144 }
145
146 /*
147         Dump stats for a route entry in the table.
148 */
149 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
150         int*    counter;
151         rtable_ent_t const* rte;                // thing is really an rte
152         int             mtype;
153         int             sid;
154
155         if( (rte = (rtable_ent_t *) thing) == NULL ) {
156                 rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
157                 return;
158         }
159
160         if( (counter = (int *) vcounter) != NULL ) {
161                 (*counter)++;
162         }
163
164         mtype = rte->key & 0xffff;
165         sid = (int) (rte->key >> 32);
166
167         rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
168 }
169
170 /*
171         Given a route table, cause some stats to be spit out.
172 */
173 static void  rt_stats( route_table_t* rt ) {
174         int* counter;
175
176         if( rt == NULL ) {
177                 rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
178                 return;
179         }
180
181         counter = (int *) malloc( sizeof( int ) );
182         *counter = 0;
183         rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
184         rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
185         rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
186         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
187
188         rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
189         *counter = 0;
190         rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
191         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
192
193         rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
194         *counter = 0;
195         rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
196         rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
197
198         free( counter );
199 }
200
201 /*
202         Given a route table, cause endpoint counters to be written to stderr. The id
203         parm is written as the "source" in the output.
204 */
205 static void  rt_epcounts( route_table_t* rt, char* id ) {
206         if( rt == NULL ) {
207                 rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
208                 return;
209         }
210
211         rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
212 }
213
214 // ------------ route manager communication -------------------------------------------------
215 /*
216         Send a request for a table update to the route manager. Updates come in
217         async, so send and go.
218
219         pctx is the private context for the thread; ctx is the application context
220         that we need to be able to send the application ID in case rt mgr needs to
221         use it to idenfity us.
222
223         Returns 0 if we were not able to send a request.
224 */
225 static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
226         rmr_mbuf_t*     smsg;
227         int     state = 0;
228
229         if( ctx->rtg_whid < 0 ) {
230                 return state;
231         }
232
233         smsg = rmr_alloc_msg( pctx, 1024 );
234         if( smsg != NULL ) {
235                 smsg->mtype = RMRRM_REQ_TABLE;
236                 smsg->sub_id = 0;
237                 snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
238                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
239                 smsg->len = strlen( smsg->payload ) + 1;
240
241                 smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
242                 if( (state = smsg->state) != RMR_OK ) {
243                         rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
244                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
245                         ctx->rtg_whid = -1;
246                 }
247
248                 rmr_free_msg( smsg );
249         }
250
251         return state;
252 }
253
254 /*
255         Send an ack to the route table manager for a table ID that we are
256         processing.      State is 1 for OK, and 0 for failed. Reason might
257         be populated if we know why there was a failure.
258
259         Context should be the PRIVATE context that we use for messages
260         to route manger and NOT the user's context.
261
262         If a message buffere is passed we use that and use return to sender
263         assuming that this might be a response to a call and that is needed
264         to send back to the proper calling thread. If msg is nil, we allocate
265         and use it.
266 */
267 static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
268         int             use_rts = 1;
269         int             payload_size = 1024;
270
271         if( ctx == NULL || ctx->rtg_whid < 0 ) {
272                 return;
273         }
274
275         if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
276                 return;
277         }
278
279         if( smsg != NULL ) {
280                 smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
281         } else {
282                 use_rts = 0;
283                 smsg = rmr_alloc_msg( ctx, payload_size );
284         }
285
286         if( smsg != NULL ) {
287                 smsg->mtype = RMRRM_TABLE_STATE;
288                 smsg->sub_id = -1;
289                 snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
290                         table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
291
292                 smsg->len = strlen( smsg->payload ) + 1;
293
294                 rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
295                 if( use_rts ) {
296                         smsg = rmr_rts_msg( ctx, smsg );
297                 } else {
298                         smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
299                 }
300                 if( (state = smsg->state) != RMR_OK ) {
301                         rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
302                         rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
303                         ctx->rtg_whid = -1;
304                 }
305
306                 if( ! use_rts ) {
307                         rmr_free_msg( smsg );                   // if not our message we must free the leftovers
308                 }
309         }
310 }
311
312 // ---- utility -----------------------------------------------------------------------------------
313 /*
314         Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
315         must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
316 */
317 static char* clip( char* buf ) {
318         char*   tok;
319
320         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
321                 buf++;
322         }
323
324         if( (tok = strchr( buf, '#' )) != NULL ) {
325                 if( tok == buf ) {
326                         return buf;                                     // just push back; leading comment sym handled there
327                 }
328
329                 if( isspace( *(tok-1) ) ) {
330                         *tok = 0;
331                 }
332         }
333
334         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
335         *(tok+1) = 0;
336
337         return buf;
338 }
339
340 /*
341         This accepts a pointer to a nil terminated string, and ensures that there is a
342         newline as the last character. If there is not, a new buffer is allocated and
343         the newline is added.  If a new buffer is allocated, the buffer passed in is
344         freed.  The function returns a pointer which the caller should use, and must
345         free.  In the event of an error, a nil pointer is returned.
346 */
347 static char* ensure_nlterm( char* buf ) {
348         char*   nb = NULL;
349         int             len = 0;
350
351         if( buf != NULL ) {
352                 len = strlen( buf );
353         }
354
355         nb = buf;                                                       // default to returning original as is
356         switch( len ) {
357                 case 0:
358                         nb = strdup( "\n" );
359                         break;
360
361                 case 1:
362                         if( *buf != '\n' ) {            // not a newline; realloc
363                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
364                                 nb = strdup( " \n" );
365                                 *nb = *buf;
366                                 free( buf );
367                         }
368                         break;
369
370                 default:
371                         if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
372                                 rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
373                                 if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
374                                         memcpy( nb, buf, len );
375                                         *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
376                                         *(nb+len+1) = 0;
377                                         free( buf );
378                                 }
379                         }
380                         break;
381         }
382
383         return nb;
384 }
385
386 /*
387         Roll the new table into the active and the active into the old table. We
388         must have the lock on the active table to do this. It's possible that there
389         is no active table (first load), so we have to account for that (no locking).
390 */
391 static void roll_tables( uta_ctx_t* ctx ) {
392         if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
393                 pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
394                 ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
395                 ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
396                 pthread_mutex_unlock( ctx->rtgate );
397         } else {
398                 ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
399                 ctx->rtable = ctx->new_rtable;                          // make new the active one
400         }
401
402         ctx->new_rtable = NULL;
403 }
404
405 // ------------ entry update functions ---------------------------------------------------------------
406 /*
407         Given a message type create a route table entry and add to the hash keyed on the
408         message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
409         is the number of group slots to allocate in the entry.
410 */
411 static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
412         rtable_ent_t* rte;
413         rtable_ent_t* old_rte;          // entry which was already in the table for the key
414
415         if( rt == NULL ) {
416                 return NULL;
417         }
418
419         if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
420                 rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
421                 return NULL;
422         }
423         memset( rte, 0, sizeof( *rte ) );
424         rte->refs = 1;
425         rte->key = key;
426
427         if( nrrgroups < 0 ) {           // zero is allowed as %meid entries have no groups
428                 nrrgroups = 10;
429         }
430
431         if( nrrgroups ) {
432                 if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
433                         free( rte );
434                         return NULL;
435                 }
436                 memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
437         } else {
438                 rte->rrgroups = NULL;
439         }
440
441         rte->nrrgroups = nrrgroups;
442
443         if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
444                 del_rte( NULL, NULL, NULL, old_rte, NULL );                             // dec the ref counter and trash if unreferenced
445         }
446
447         rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
448
449         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
450         return rte;
451 }
452
453 /*
454         This accepts partially parsed information from an rte or mse record sent by route manager or read from
455         a file such that:
456                 ts_field is the msg-type,sender field
457                 subid is the integer subscription id
458                 rr_field is the endpoint information for round robening message over
459
460         If all goes well, this will add an RTE to the table under construction.
461
462         The ts_field is checked to see if we should ingest this record. We ingest if one of
463         these is true:
464                 there is no sender info (a generic entry for all)
465                 there is sender and our host:port matches one of the senders
466                 the sender info is an IP address that matches one of our IP addresses
467 */
468 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
469         rtable_ent_t*   rte;            // route table entry added
470         char const*     tok;
471         int             ntoks;
472         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
473         char*   tokens[128];
474         char*   gtokens[64];
475         int             i;
476         int             ngtoks;                         // number of tokens in the group list
477         int             grp;                            // index into group list
478
479         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
480         rr_field = clip( rr_field );
481
482         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
483                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
484                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
485
486                 key = build_rt_key( subid, atoi( ts_field ) );
487
488                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
489
490                 if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
491                         if( strcmp( gtokens[0], "%meid" ) == 0 ) {
492                                 ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
493                         }
494                         rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
495                         rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
496
497                         for( grp = 0; grp < ngtoks; grp++ ) {
498                                 if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
499                                         for( i = 0; i < ntoks; i++ ) {
500                                                 if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
501                                                         if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
502                                                         uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
503                                                 }
504                                         }
505                                 }
506                         }
507                 }
508         } else {
509                 if( DEBUG || (vlevel > 2) ) {
510                         rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
511                 }
512         }
513 }
514
515 /*
516         Trash_entry takes a partially parsed record from the input and
517         will delete the entry if the sender,mtype matches us or it's a
518         generic mtype. The refernce in the new table is removed and the
519         refcounter for the actual rte is decreased. If that ref count is
520         0 then the memory is freed (handled byh the del_rte call).
521 */
522 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
523         rtable_ent_t*   rte;            // route table entry to be 'deleted'
524         char const*     tok;
525         int             ntoks;
526         uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
527         char*   tokens[128];
528
529         if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
530                 return;
531         }
532
533         ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
534
535         if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
536                 (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
537                 has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
538
539                 key = build_rt_key( subid, atoi( ts_field ) );
540                 rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
541                 if( rte != NULL ) {
542                         if( DEBUG || (vlevel > 1) ) {
543                                  rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
544                         }
545                         rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
546                         del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
547                 } else {
548                         if( DEBUG || (vlevel > 1) ) {
549                                 rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
550                         }
551                 }
552         } else {
553                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
554         }
555 }
556
557 // -------------------------- parse functions --------------------------------------------------
558
559 /*
560         Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
561         the 'owner' which should be the dns name or IP address of an enpoint
562         the meid_list is a space separated list of me IDs
563
564         This function assumes the caller has vetted the pointers as needed.
565
566         For each meid in the list, an entry is pushed into the hash which references the owner
567         endpoint such that when the meid is used to route a message it references the endpoint
568         to send messages to.
569 */
570 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
571         char const*     tok;
572         int             ntoks;
573         char*   tokens[128];
574         int             i;
575         int             state;
576         endpoint_t*     ep;                                             // endpoint struct for the owner
577
578         owner = clip( owner );                          // ditch extra whitespace and trailing comments
579         meid_list = clip( meid_list );
580
581         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
582         for( i = 0; i < ntoks; i++ ) {
583                 if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
584                         state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
585                         if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
586                 } else {
587                         rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
588                 }
589         }
590 }
591
592 /*
593         Given the tokens from an mme_del, delete the listed meid entries from the new
594         table. The list is a space separated list of meids.
595
596         The meids in the hash reference endpoints which are never deleted and so
597         the only thing that we need to do here is to remove the meid from the hash.
598
599         This function assumes the caller has vetted the pointers as needed.
600 */
601 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
602         char const*     tok;
603         int             ntoks;
604         char*   tokens[128];
605         int             i;
606
607         if( rtab->hash == NULL ) {
608                 return;
609         }
610
611         meid_list = clip( meid_list );
612
613         ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
614         for( i = 0; i < ntoks; i++ ) {
615                 rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
616                 if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
617         }
618 }
619
620 /*
621         Parse a partially parsed meid record. Tokens[0] should be one of:
622                 meid_map, mme_ar, mme_del.
623
624         pctx is the private context needed to return an ack/nack using the provided
625         message buffer with the route managers address info.
626 */
627 static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
628         char wbuf[1024];
629
630         if( tokens == NULL || ntoks < 1 ) {
631                 return;                                                 // silent but should never happen
632         }
633
634         if( ntoks < 2 ) {                                       // must have at least two for any valid request record
635                 rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
636                 return;
637         }
638
639         if( strcmp( tokens[0], "meid_map" ) == 0 ) {                                    // start or end of the meid map update
640                 tokens[1] = clip( tokens[1] );
641                 if( *(tokens[1]) == 's' ) {
642                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
643                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
644                                 uta_rt_drop( ctx->new_rtable );
645                                 ctx->new_rtable = NULL;
646                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
647                         }
648
649                         if( ctx->table_id != NULL ) {
650                                 free( ctx->table_id );
651                         }
652                         if( ntoks > 2 ) {
653                                 ctx->table_id = strdup( clip( tokens[2] ) );
654                         } else {
655                                 ctx->table_id = NULL;
656                         }
657
658                         ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
659                         ctx->new_rtable->mupdates = 0;
660
661                         if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
662                 } else {
663                         if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
664                                 if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
665                                         if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
666                                                 rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
667                                                                 ctx->new_rtable->mupdates, tokens[2] );
668                                                 snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
669                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
670                                                 uta_rt_drop( ctx->new_rtable );
671                                                 ctx->new_rtable = NULL;
672                                                 return;
673                                         }
674
675                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
676                                 }
677
678                                 if( ctx->new_rtable ) {
679                                         roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
680                                         if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
681                                         send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
682
683                                         if( vlevel > 0 ) {
684                                                 if( ctx->old_rtable != NULL ) {
685                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
686                                                         rt_stats( ctx->old_rtable );
687                                                 } else {
688                                                         rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
689                                                 }
690                                                 rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
691                                                 rt_stats( ctx->rtable );
692                                         }
693                                 } else {
694                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
695                                         ctx->new_rtable = NULL;
696                                 }
697                         }
698                 }
699
700                 return;
701         }
702
703         if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
704                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
705                 return;
706         }
707
708         if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
709                 if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
710                         rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
711                         return;
712                 }
713                 parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
714                 ctx->new_rtable->mupdates++;
715                 return;
716         }
717
718         if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
719                 parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
720                 ctx->new_rtable->mupdates++;
721                 return;
722         }
723 }
724
725 /*
726         Parse a single record recevied from the route table generator, or read
727         from a static route table file.  Start records cause a new table to
728         be started (if a partial table was received it is discarded. Table
729         entry records are added to the currenly 'in progress' table, and an
730         end record causes the in progress table to be finalised and the
731         currently active table is replaced.
732
733         The updated table will be activated when the *|end record is encountered.
734         However, to allow for a "double" update, where both the meid map and the
735         route table must be updated at the same time, the end indication on a
736         route table (new or update) may specifiy "hold" which indicates that meid
737         map entries are to follow and the updated route table should be held as
738         pending until the end of the meid map is received and validated.
739
740         CAUTION:  we are assuming that there is a single route/meid map generator
741                 and as such only one type of update is received at a time; in other
742                 words, the sender cannot mix update records and if there is more than
743                 one sender process they must synchronise to avoid issues.
744
745
746         For a RT update, we expect:
747                 newrt | start | <table-id>
748                 newrt | end | <count>
749                 rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
750                 mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
751                 mse| <mtype>[,sender] | <sub-id> | %meid
752
753
754         For a meid map update we expect:
755                 meid_map | start | <table-id>
756                 meid_map | end | <count> | <md5-hash>
757                 mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
758                 mme_del | <meid0> <meid1>...<meidn>
759
760
761         The pctx is our private context that must be used to send acks/status
762         messages back to the route manager.  The regular ctx is the ctx that
763         the user has been given and thus that's where we have to hang the route
764         table we're working with.
765
766         If mbuf is given, and we need to ack, then we ack using the mbuf and a
767         return to sender call (allows route manager to use wh_call() to send
768         an update and rts is required to get that back to the right thread).
769         If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
770         will be used.
771 */
772 static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
773         int i;
774         int ntoks;                                                      // number of tokens found in something
775         int ngtoks;
776         int     grp;                                                    // group number
777         rtable_ent_t const*     rte;                    // route table entry added
778         char*   tokens[128];
779         char*   tok;                                            // pointer into a token or string
780         char    wbuf[1024];
781
782         if( ! buf ) {
783                 return;
784         }
785
786         while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
787                 buf++;
788         }
789         for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
790         *(tok+1) = 0;
791
792         memset( tokens, 0, sizeof( tokens ) );
793         if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
794                 tokens[0] = clip( tokens[0] );
795                 switch( *(tokens[0]) ) {
796                         case 0:                                                                                                 // ignore blanks
797                                 // fallthrough
798                         case '#':                                                                                               // and comment lines
799                                 break;
800
801                         case 'd':                                                                                               // del | [sender,]mtype | sub-id
802                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
803                                         break;
804                                 }
805
806                                 if( ntoks < 3 ) {
807                                         if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
808                                         break;
809                                 }
810
811                                 trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel );
812                                 ctx->new_rtable->updates++;
813                                 break;
814
815                         case 'n':                                                                                               // newrt|{start|end}
816                                 tokens[1] = clip( tokens[1] );
817                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
818                                         if( ntoks >2 ) {
819                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
820                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
821                                                                 ctx->new_rtable->updates, tokens[2] );
822                                                         snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
823                                                         send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
824                                                         uta_rt_drop( ctx->new_rtable );
825                                                         ctx->new_rtable = NULL;
826                                                         break;
827                                                 }
828                                         }
829
830                                         if( ctx->new_rtable ) {
831                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
832                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
833
834                                                 if( vlevel > 0 ) {
835                                                         if( ctx->old_rtable != NULL ) {
836                                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
837                                                                 rt_stats( ctx->old_rtable );
838                                                         } else {
839                                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
840                                                         }
841                                                         rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
842                                                         rt_stats( ctx->rtable );
843                                                 }
844
845                                                 send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
846                                                 ctx->rtable_ready = 1;                                                  // route based sends can now happen
847                                         } else {
848                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
849                                                 ctx->new_rtable = NULL;
850                                         }
851                                 } else {                                                                                                                        // start a new table.
852                                         if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
853                                                 send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
854
855                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
856                                                 uta_rt_drop( ctx->new_rtable );
857                                                 ctx->new_rtable = NULL;
858                                         }
859
860                                         if( ctx->table_id != NULL ) {
861                                                 free( ctx->table_id );
862                                         }
863                                         if( ntoks >2 ) {
864                                                 ctx->table_id = strdup( clip( tokens[2] ) );
865                                         } else {
866                                                 ctx->table_id = NULL;
867                                         }
868
869                                         ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
870                                         ctx->new_rtable->updates = 0;                                           // init count of entries received
871
872                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
873                                 }
874                                 break;
875
876                         case 'm':                                                                       // mse entry or one of the meid_ records
877                                 if( strcmp( tokens[0], "mse" ) == 0 ) {
878                                         if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
879                                                 break;
880                                         }
881
882                                         if( ntoks < 4 ) {
883                                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
884                                                 break;
885                                         }
886
887                                         build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
888                                         ctx->new_rtable->updates++;
889                                 } else {
890                                         meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
891                                 }
892                                 break;
893
894                         case 'r':                                       // assume rt entry
895                                 if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
896                                         break;
897                                 }
898
899                                 ctx->new_rtable->updates++;
900                                 if( ntoks > 3 ) {                                                                                                       // assume new entry with subid last
901                                         build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
902                                 } else {
903                                         build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel );                  // old school entry has no sub id
904                                 }
905                                 break;
906
907                         case 'u':                                                                                               // update current table, not a total replacement
908                                 tokens[1] = clip( tokens[1] );
909                                 if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
910                                         if( ctx->new_rtable == NULL ) {                                 // update table not in progress
911                                                 break;
912                                         }
913
914                                         if( ntoks >2 ) {
915                                                 if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
916                                                         rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
917                                                                 ctx->new_rtable->updates, tokens[2] );
918                                                         uta_rt_drop( ctx->new_rtable );
919                                                         ctx->new_rtable = NULL;
920                                                         break;
921                                                 }
922                                         }
923
924                                         if( ctx->new_rtable ) {
925                                                 roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
926                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
927
928                                                 if( vlevel > 0 ) {
929                                                         if( ctx->old_rtable != NULL ) {
930                                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table:  (ref_count=%d)\n", ctx->old_rtable->ref_count );
931                                                                 rt_stats( ctx->old_rtable );
932                                                         } else {
933                                                                 rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
934                                                         }
935                                                         rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
936                                                         rt_stats( ctx->rtable );
937                                                 }
938                                         } else {
939                                                 if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
940                                                 ctx->new_rtable = NULL;
941                                         }
942                                 } else {                                                                                        // start a new table.
943                                         if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
944                                                 if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
945                                                 uta_rt_drop( ctx->new_rtable );
946                                                 ctx->new_rtable = NULL;
947                                         }
948
949                                         if( ntoks > 2 ) {
950                                                 if( ctx->table_id != NULL ) {
951                                                         free( ctx->table_id );
952                                                 }
953                                                 ctx->table_id = strdup( clip( tokens[2] ) );
954                                         }
955
956                                         ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
957                                         ctx->new_rtable->updates = 0;                                                   // init count of updates received
958
959                                         if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
960                                 }
961                                 break;
962
963                         default:
964                                 if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
965                                 break;
966                 }
967         }
968 }
969
970 /*
971         This function attempts to open a static route table in order to create a 'seed'
972         table during initialisation.  The environment variable RMR_SEED_RT is expected
973         to contain the necessary path to the file. If missing, or if the file is empty,
974         no route table will be available until one is received from the generator.
975
976         This function is probably most useful for testing situations, or extreme
977         cases where the routes are static.
978 */
979 static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
980         int             i;
981         char*   fname;
982         char*   fbuf;                           // buffer with file contents
983         char*   rec;                            // start of the record
984         char*   eor;                            // end of the record
985         int             rcount = 0;                     // record count for debug
986
987         if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
988                 return;
989         }
990
991         if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
992                 rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
993                 return;
994         }
995
996         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
997         for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
998                 if( *eor == '\r' ) {
999                         *eor = '\n';                                                            // will look like a blank line which is ok
1000                 }
1001         }
1002
1003         rec = fbuf;
1004         while( rec && *rec ) {
1005                 rcount++;
1006                 if( (eor = strchr( rec, '\n' )) != NULL ) {
1007                         *eor = 0;
1008                 } else {
1009                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
1010                         rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
1011                         free( fbuf );
1012                         return;
1013                 }
1014
1015                 parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
1016
1017                 rec = eor+1;
1018         }
1019
1020         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
1021         free( fbuf );
1022 }
1023
1024 /*
1025         Callback driven for each named thing in a symtab. We collect the pointers to those
1026         things for later use (cloning).
1027 */
1028 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
1029         thing_list_t*   tl;
1030
1031         if( (tl = (thing_list_t *) vthing_list) == NULL ) {
1032                 return;
1033         }
1034
1035         if( thing == NULL ) {
1036                 rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1037                 return;
1038         }
1039
1040         tl->names[tl->nused] = name;                    // the name/key
1041         tl->things[tl->nused++] = thing;                // save a reference to the thing
1042 }
1043
1044 /*
1045         Called to delete a route table entry struct. We delete the array of endpoint
1046         pointers, but NOT the endpoints referenced as those are referenced from
1047         multiple entries.
1048
1049         Route table entries can be concurrently referenced by multiple symtabs, so
1050         the actual delete happens only if decrementing the rte's ref count takes it
1051         to 0. Thus, it is safe to call this function across a symtab when cleaning up
1052         the symtab, or overlaying an entry.
1053
1054         This function uses ONLY the pointer to the rte (thing) and ignores the other
1055         information that symtab foreach function passes (st, entry, and data) which
1056         means that it _can_ safetly be used outside of the foreach setting. If
1057         the function is changed to depend on any of these three, then a stand-alone
1058         rte_cleanup() function should be added and referenced by this, and refererences
1059         to this outside of the foreach world should be changed.
1060 */
1061 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) {
1062         rtable_ent_t*   rte;
1063         int i;
1064
1065         if( (rte = (rtable_ent_t *) thing) == NULL ) {
1066                 rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
1067                 return;
1068         }
1069
1070         rte->refs--;
1071         if( rte->refs > 0 ) {                   // something still referencing, so it lives
1072                 return;
1073         }
1074
1075         if( rte->rrgroups ) {                                                                   // clean up the round robin groups
1076                 for( i = 0; i < rte->nrrgroups; i++ ) {
1077                         if( rte->rrgroups[i] ) {
1078                                 free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
1079                         }
1080                 }
1081
1082                 free( rte->rrgroups );
1083         }
1084
1085         free( rte );                                                                                    // finally, drop the potato
1086 }
1087
1088 /*
1089         Read an entire file into a buffer. We assume for route table files
1090         they will be smallish and so this won't be a problem.
1091         Returns a pointer to the buffer, or nil. Caller must free.
1092         Terminates the buffer with a nil character for string processing.
1093
1094         If we cannot stat the file, we assume it's empty or missing and return
1095         an empty buffer, as opposed to a nil, so the caller can generate defaults
1096         or error if an empty/missing file isn't tolerated.
1097 */
1098 static char* uta_fib( char const* fname ) {
1099         struct stat     stats;
1100         off_t           fsize = 8192;   // size of the file
1101         off_t           nread;                  // number of bytes read
1102         int                     fd;
1103         char*           buf;                    // input buffer
1104
1105         if( (fd = open( fname, O_RDONLY )) >= 0 ) {
1106                 if( fstat( fd, &stats ) >= 0 ) {
1107                         if( stats.st_size <= 0 ) {                                      // empty file
1108                                 close( fd );
1109                                 fd = -1;
1110                         } else {
1111                                 fsize = stats.st_size;                                          // stat ok, save the file size
1112                         }
1113                 } else {
1114                         fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
1115                 }
1116         }
1117
1118         if( fd < 0 ) {                                                                                  // didn't open or empty
1119                 if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
1120                         return NULL;
1121                 }
1122
1123                 *buf = 0;
1124                 return buf;
1125         }
1126
1127         // add a size limit check here
1128
1129         if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) {           // enough to add nil char to make string
1130                 close( fd );
1131                 errno = ENOMEM;
1132                 return NULL;
1133         }
1134
1135         nread = read( fd, buf, fsize );
1136         if( nread < 0 || nread > fsize ) {                                                      // failure of some kind
1137                 free( buf );
1138                 errno = EFBIG;                                                                                  // likely too much to handle
1139                 close( fd );
1140                 return NULL;
1141         }
1142
1143         buf[nread] = 0;
1144
1145         close( fd );
1146         return buf;
1147 }
1148
1149 // --------------------- initialisation/creation ---------------------------------------------
1150 /*
1151         Create and initialise a route table; Returns a pointer to the table struct.
1152 */
1153 static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
1154         route_table_t*  rt;
1155
1156         if( ctx == NULL ) {
1157                 return NULL;
1158         }
1159         if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
1160                 return NULL;
1161         }
1162
1163         memset( rt, 0, sizeof( *rt ) );
1164
1165         if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
1166                 free( rt );
1167                 return NULL;
1168         }
1169
1170         rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
1171         rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
1172         pthread_mutex_init( rt->gate, NULL );
1173
1174         return rt;
1175 }
1176
1177 /*
1178         Clones one of the spaces in the given table.
1179         Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
1180         Space is the space in the old table to copy. Space 0 uses an integer key and
1181         references rte structs. All other spaces use a string key and reference endpoints.
1182 */
1183 static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
1184         endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
1185         rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
1186         void*   sst;                    // source symtab
1187         void*   nst;                    // new symtab
1188         thing_list_t things;    // things from the space to copy
1189         int             i;
1190         int             free_on_err = 0;
1191
1192         if( ctx == NULL ) {
1193                 return NULL;
1194         }
1195         if( nrt == NULL ) {                             // make a new table if needed
1196                 free_on_err = 1;
1197                 nrt = uta_rt_init( ctx );
1198                 if( nrt == NULL ) {
1199                         return NULL;
1200                 }
1201         }
1202
1203         if( srt == NULL ) {             // source was nil, just give back the new table
1204                 return nrt;
1205         }
1206
1207         things.nalloc = 2048;
1208         things.nused = 0;
1209         things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
1210         memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
1211         things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
1212         memset( things.names, 0, sizeof( char * ) * things.nalloc );
1213         if( things.things == NULL ) {
1214                 if( free_on_err ) {
1215                         rmr_sym_free( nrt->hash );
1216                         free( nrt );
1217                         nrt = NULL;
1218                 }
1219
1220                 return nrt;
1221         }
1222
1223         sst = srt->hash;                                                                                        // convenience pointers (src symtab)
1224         nst = nrt->hash;
1225
1226         rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
1227
1228         if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
1229         for( i = 0; i < things.nused; i++ ) {
1230                 if( space ) {                                                                                           // string key, epoint reference
1231                         ep = (endpoint_t *) things.things[i];
1232                         rmr_sym_put( nst, things.names[i], space, ep );                                 // slam this one into the new table
1233                 } else {
1234                         rte = (rtable_ent_t *) things.things[i];
1235                         rte->refs++;                                                                                    // rtes can be removed, so we track references
1236                         rmr_sym_map( nst, rte->key, rte );                                              // add to hash using numeric mtype/sub-id as key (default to space 0)
1237                 }
1238         }
1239
1240         free( things.things );
1241         free( (void *) things.names );
1242         return nrt;
1243 }
1244
1245 /*
1246         Given a destination route table (drt), clone from the source (srt) into it.
1247         If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
1248         allocate the drt if that was nil too). If all is true (1), then we will clone both
1249         the MT and the ME spaces; otherwise only the ME space is cloned.
1250 */
1251 static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
1252         endpoint_t*             ep;                             // an endpoint
1253         rtable_ent_t*   rte;                    // a route table entry
1254         int i;
1255
1256         if( ctx == NULL ) {
1257                 return NULL;
1258         }
1259         if( drt == NULL ) {
1260                 drt = uta_rt_init( ctx );
1261         }
1262         if( srt == NULL ) {
1263                 return drt;
1264         }
1265
1266         drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
1267         rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
1268         if( all ) {
1269                 rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
1270         }
1271
1272         return drt;
1273 }
1274
1275 /*
1276         Prepares the "new" route table for populating. If the old_rtable is not nil, then
1277         we wait for it's use count to reach 0. Then the table is cleared, and moved on the
1278         context to be referenced by the new pointer; the old pointer is set to nil.
1279
1280         If the old table doesn't exist, then a new table is created and the new pointer is
1281         set to reference it.
1282 */
1283 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
1284         int counter = 0;
1285         route_table_t*  rt;
1286
1287         if( ctx == NULL ) {
1288                 return NULL;
1289         }
1290
1291         if( (rt = ctx->old_rtable) != NULL ) {
1292                 ctx->old_rtable = NULL;
1293                 while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
1294                         if( counter++ > 1000 ) {
1295                                 rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
1296                                 break;
1297                         }
1298
1299                         usleep( 1000 );                                         // small sleep to yield the processer if that is needed
1300                 }
1301
1302                 rmr_sym_clear( rt );                                    // clear all entries from the old table
1303         } else {
1304                 rt = NULL;
1305         }
1306
1307         rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
1308         rt->ref_count = 0;                                                      // take no chances; ensure it's 0!
1309
1310         return rt;
1311 }
1312
1313
1314 /*
1315         Given a name, find the endpoint struct in the provided route table.
1316 */
1317 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
1318
1319         if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
1320                 return NULL;
1321         }
1322
1323         return rmr_sym_get( rt->ephash, ep_name, 1 );
1324 }
1325
1326 /*
1327         Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
1328         Does NOT destroy the gate as it's a common gate for ALL route tables.
1329 */
1330 static void uta_rt_drop( route_table_t* rt ) {
1331         if( rt == NULL ) {
1332                 return;
1333         }
1334
1335         rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // free each rte referenced by the hash, but NOT the endpoints
1336         rmr_sym_free( rt->hash );                                                                       // free all of the hash related data
1337         free( rt );
1338 }
1339
1340 /*
1341         Look up and return the pointer to the endpoint stuct matching the given name.
1342         If not in the hash, a new endpoint is created, added to the hash. Should always
1343         return a pointer.
1344 */
1345 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
1346         endpoint_t*     ep;
1347
1348         if( !rt || !ep_name || ! *ep_name ) {
1349                 rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
1350                 errno = EINVAL;
1351                 return NULL;
1352         }
1353
1354         if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
1355                 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
1356                         rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
1357                         errno = ENOMEM;
1358                         return NULL;
1359                 }
1360
1361                 ep->notify = 1;                                                         // show notification on first connection failure
1362                 ep->open = 0;                                                           // not connected
1363                 ep->addr = uta_h2ip( ep_name );
1364                 ep->name = strdup( ep_name );
1365                 pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
1366                 memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
1367
1368                 rmr_sym_put( rt->ephash, ep_name, 1, ep );
1369         }
1370
1371         return ep;
1372 }
1373
1374
1375 /*
1376         Given a session id and message type build a key that can be used to look up the rte in the route
1377         table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
1378 */
1379 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
1380         uint64_t key;
1381
1382         if( sub_id == UNSET_SUBID ) {
1383                 key = 0xffffffff00000000 | mtype;
1384         } else {
1385                 key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
1386         }
1387
1388         return key;
1389 }
1390
1391 /*
1392         Given a route table and meid string, find the owner (if known). Returns a pointer to
1393         the endpoint struct or nil.
1394 */
1395 static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
1396         endpoint_t const* ep;           // the ep we found in the hash
1397
1398         if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
1399                 return NULL;
1400         }
1401
1402         return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
1403 }
1404
1405 /*
1406         This returns a pointer to the currently active route table and ups
1407         the reference count so that the route table is not freed while it
1408         is being used. The caller MUST call release_rt() when finished
1409         with the pointer.
1410
1411         Care must be taken: the ctx->rtable pointer _could_ change during the time
1412         between the release of the lock and the return. Therefore we MUST grab
1413         the current pointer when we have the lock so that if it does we don't
1414         return a pointer to the wrong table.
1415
1416         This will return NULL if there is no active table.
1417 */
1418 static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
1419         route_table_t*  rrt;                    // return value
1420
1421         if( ctx == NULL || ctx->rtable == NULL ) {
1422                 return NULL;
1423         }
1424
1425         pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
1426         rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
1427         rrt->ref_count++;
1428         pthread_mutex_unlock( ctx->rtgate );
1429
1430         return rrt;                                                                             // pointer we upped the count with
1431 }
1432
1433 /*
1434         This will "release" the route table by reducing the use counter
1435         in the table. The table may not be freed until the counter reaches
1436         0, so it's imparative that the pointer be "released" when it is
1437         fetched by get_rt().  Once the caller has released the table it
1438         may not safely use the pointer that it had.
1439 */
1440 static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
1441         if( ctx == NULL || rt == NULL ) {
1442                 return;
1443         }
1444
1445         pthread_mutex_lock( ctx->rtgate );                              // must hold lock
1446         if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
1447                 rt->ref_count--;
1448         }
1449         pthread_mutex_unlock( ctx->rtgate );
1450 }
1451 #endif