1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_nng_static.c
23 Abstract: Route table management functions which depend on the underlying
24 transport mechanism and thus cannot be included with the generic
25 route table functions.
27 This module is designed to be included by any module (main) needing
28 the static/private stuff.
30 Author: E. Scott Daniels
31 Date: 29 November 2018
34 #ifndef rtable_static_c
35 #define rtable_static_c
44 #include <sys/types.h>
49 // -----------------------------------------------------------------------------------------------------
52 Establish a TCP connection to the indicated target (IP address).
53 Target assumed to be address:port. The new socket is returned via the
54 user supplied pointer so that a success/fail code is returned directly.
55 Return value is 0 (false) on failure, 1 (true) on success.
57 In order to support multi-threaded user applications we must hold a lock before
58 we attempt to create a dialer and connect. NNG is thread safe, but we can
59 get things into a bad state if we allow a collision here. The lock grab
60 only happens on the intial session setup.
62 static int uta_link2( endpoint_t* ep ) {
63 static int flags = -1;
68 char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
78 tok = getenv( "RMR_ASYNC_CONN" );
79 if( tok == NULL || *tok == '1' ) {
80 flags = NNG_FLAG_NONBLOCK; // start dialer asynch
86 target = ep->name; // always give name to transport so chaning dest IP does not break reconnect
87 nn_sock = &ep->nn_sock;
90 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
91 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
95 if( nn_sock == NULL ) {
100 pthread_mutex_lock( &ep->gate ); // grab the lock
102 pthread_mutex_unlock( &ep->gate );
107 if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
108 pthread_mutex_unlock( &ep->gate );
109 rmr_vlog( RMR_VL_CRIT, "rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
113 snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
114 if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
115 pthread_mutex_unlock( &ep->gate );
116 rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
117 nng_close( *nn_sock );
121 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s)
122 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
124 if( (state = nng_dialer_start( *dialer, flags )) != 0 ) { // can fail immediatly (unlike nanomsg)
125 pthread_mutex_unlock( &ep->gate );
126 rmr_vlog( RMR_VL_WARN, "rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
127 nng_close( *nn_sock );
131 if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_link2l: dial was successful: %s\n", target );
133 ep->open = TRUE; // must set before release
134 pthread_mutex_unlock( &ep->gate );
139 This provides a protocol independent mechanism for establishing the connection to an endpoint.
140 Return is true (1) if the link was opened; false on error.
142 For some flavours, the context is needed by this function, but not for nng.
144 static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
149 if( ep->open ) { // already open, do nothing
159 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
160 hash we add it and create the endpoint struct.
162 The caller must supply the specific route table (we assume it will be pending, but they
163 could live on the edge and update the active one, though that's not at all a good idea).
165 extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
167 rrgroup_t* rrg; // pointer at group to update
169 if( ! rte || ! rt ) {
170 rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" );
174 if( rte->nrrgroups <= group ) {
175 rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
179 if( (rrg = rte->rrgroups[group]) == NULL ) {
180 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
181 rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
184 memset( rrg, 0, sizeof( *rrg ) );
186 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
187 rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
190 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
192 rte->rrgroups[group] = rrg;
194 rrg->ep_idx = 0; // next endpoint to send to
195 rrg->nused = 0; // number populated
196 rrg->nendpts = MAX_EP_GROUP; // number allocated
199 ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
202 if( rrg->nused >= rrg->nendpts ) {
203 // future: reallocate
204 rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
208 rrg->epts[rrg->nused] = ep;
212 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
218 Given a name, find the nano socket needed to send to it. Returns the socket via
219 the user pointer passed in and sets the return value to true (1). If the
220 endpoint cannot be found false (0) is returned.
222 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
230 ep = rmr_sym_get( rt->hash, ep_name, 1 );
231 if( uepp != NULL ) { // caller needs endpoint too, give it back
235 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name );
236 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
241 if( ! ep->open ) { // not open -- connect now
242 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name );
243 if( ep->addr == NULL ) { // name didn't resolve before, try again
244 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
246 if( uta_link2( ep ) ) { // find entry in table and create link
249 *nn_sock = ep->nn_sock; // pass socket back to caller
251 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
253 *nn_sock = ep->nn_sock;
261 Make a round robin selection within a round robin group for a route table
262 entry. Returns the nanomsg socket if there is a rte for the message
263 key, and group is defined. Socket is returned via pointer in the parm
266 The group is the group number to select from.
268 The user supplied (via pointer to) integer 'more' will be set if there are
269 additional groups beyond the one selected. This allows the caller to
270 to easily iterate over the group list -- more is set when the group should
271 be incremented and the function invoked again. Groups start at 0.
273 The return value is true (>0) if the socket was found and *nn_sock was updated
274 and false (0) if there is no associated socket for the msg type, group combination.
275 We return the index+1 from the round robin table on success so that we can verify
276 during test that different entries are being seleted; we cannot depend on the nng
277 socket being different as we could with nano.
279 NOTE: The round robin selection index increment might collide with other
280 threads if multiple threads are attempting to send to the same round
281 robin group; the consequences are small and avoid locking. The only side
282 effect is either sending two messages in a row to, or skipping, an endpoint.
283 Both of these, in the grand scheme of things, is minor compared to the
284 overhead of grabbing a lock on each call.
286 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
287 endpoint_t* ep; // seected end point
288 int state = FALSE; // processing state
294 if( ! more ) { // eliminate cheks each time we need to use
298 if( ! nn_sock ) { // user didn't supply a pointer
309 if( group < 0 || group >= rte->nrrgroups ) {
310 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
315 if( (rrg = rte->rrgroups[group]) == NULL ) {
316 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
317 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
321 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
323 switch( rrg->nused ) {
324 case 0: // nothing allocated, just punt
325 //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
328 case 1: // exactly one, no rr to deal with
330 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
334 default: // need to pick one and adjust rr counts
336 idx = rrg->ep_idx++ % rrg->nused; // see note above
337 ep = rrg->epts[idx]; // select next endpoint
338 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
339 state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
343 if( uepp != NULL ) { // caller needs refernce to endpoint too
346 if( state ) { // end point selected, open if not, get socket either way
347 if( ! ep->open ) { // not connected
348 if( ep->addr == NULL ) { // name didn't resolve before, try again
349 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
352 if( uta_link2( ep ) ) { // find entry in table and create link
354 *nn_sock = ep->nn_sock; // pass socket back to caller
358 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
360 *nn_sock = ep->nn_sock;
368 Finds the rtable entry which matches the key. Returns a nil pointer if
369 no entry is found. If try_alternate is set, then we will attempt
370 to find the entry with a key based only on the message type.
372 static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
373 uint64_t key; // key is sub id and mtype banged together
374 rtable_ent_t* rte; // the entry we found
376 if( rt == NULL || rt->hash == NULL ) {
380 key = build_rt_key( sid, mtype ); // first try with a 'full' key
381 if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL) || ! try_alt ) { // found or not allowed to try the alternate, return what we have
385 if( sid != UNSET_SUBID ) { // not found, and allowed to try alternate; and the sub_id was set
386 key = build_rt_key( UNSET_SUBID, mtype ); // rebuild key
387 rte = rmr_sym_pull( rt->hash, key ); // see what we get with this
394 Return a string of count information. E.g.:
395 <ep-name>:<port> <good> <hard-fail> <soft-fail>
397 Caller must free the string allocated if a buffer was not provided.
399 Pointer returned is to a freshly allocated string, or the user buffer
402 If the endpoint passed is a nil pointer, then we return a nil -- caller
405 static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
406 char* rs; // result string
416 rs = malloc( sizeof( char ) * ubuf_len );
419 snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
425 Given a message, use the meid field to find the owner endpoint for the meid.
426 The owner ep is then used to extract the socket through which the message
427 is sent. This returns TRUE if we found a socket and it was written to the
428 nn_sock pointer; false if we didn't.
430 We've been told that the meid is a string, thus we count on it being a nil
431 terminated set of bytes.
433 If we return false there is no guarentee that the caller's reference to the
434 ep is valid or nil. Caller can trus the ep reference only when the return is
437 static int epsock_meid( route_table_t *rtable, rmr_mbuf_t* msg, nng_socket* nn_sock, endpoint_t** uepp ) {
438 endpoint_t* ep; // seected end point
439 int state = FALSE; // processing state
443 if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast
448 meid = ((uta_mhdr_t *) msg->header)->meid;
450 if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
451 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
455 if( uepp != NULL ) { // ensure ep is returned to the caller
460 if( ! ep->open ) { // not connected
461 if( ep->addr == NULL ) { // name didn't resolve before, try again
462 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
465 if( uta_link2( ep ) ) { // find entry in table and create link
467 *nn_sock = ep->nn_sock; // pass socket back to caller
471 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
473 *nn_sock = ep->nn_sock;