1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_nng_static.c
23 Abstract: Route table management functions which depend on the underlying
24 transport mechanism and thus cannot be included with the generic
25 route table functions.
27 This module is designed to be included by any module (main) needing
28 the static/private stuff.
30 Author: E. Scott Daniels
31 Date: 29 November 2018
34 #ifndef rtable_static_c
35 #define rtable_static_c
44 #include <sys/types.h>
49 // -----------------------------------------------------------------------------------------------------
52 Establish a TCP connection to the indicated target (IP address).
53 Target assumed to be address:port. The new socket is returned via the
54 user supplied pointer so that a success/fail code is returned directly.
55 Return value is 0 (false) on failure, 1 (true) on success.
57 In order to support multi-threaded user applications we must hold a lock before
58 we attempt to create a dialer and connect. NNG is thread safe, but we can
59 get things into a bad state if we allow a collision here. The lock grab
60 only happens on the intial session setup.
62 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
63 static int uta_link2( endpoint_t* ep ) {
64 static int flags = -1;
69 char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
79 tok = getenv( "RMR_ASYNC_CONN" );
80 if( tok == NULL || *tok == '1' ) {
81 flags = NNG_FLAG_NONBLOCK; // start dialer asynch
87 target = ep->name; // always give name to transport so chaning dest IP does not break reconnect
88 nn_sock = &ep->nn_sock;
91 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
92 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
96 if( nn_sock == NULL ) {
101 pthread_mutex_lock( &ep->gate ); // grab the lock
103 pthread_mutex_unlock( &ep->gate );
108 if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
109 pthread_mutex_unlock( &ep->gate );
110 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
114 snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
115 if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
116 pthread_mutex_unlock( &ep->gate );
117 fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
118 nng_close( *nn_sock );
122 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s)
123 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
125 if( (state = nng_dialer_start( *dialer, flags )) != 0 ) { // can fail immediatly (unlike nanomsg)
126 pthread_mutex_unlock( &ep->gate );
127 fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
128 nng_close( *nn_sock );
132 if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
134 ep->open = TRUE; // must set before release
135 pthread_mutex_unlock( &ep->gate );
140 This provides a protocol independent mechanism for establishing the connection to an endpoint.
141 Return is true (1) if the link was opened; false on error.
143 static int rt_link2_ep( endpoint_t* ep ) {
148 if( ep->open ) { // already open, do nothing
158 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
159 hash we add it and create the endpoint struct.
161 The caller must supply the specific route table (we assume it will be pending, but they
162 could live on the edge and update the active one, though that's not at all a good idea).
164 extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
166 rrgroup_t* rrg; // pointer at group to update
168 if( ! rte || ! rt ) {
169 fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
173 if( rte->nrrgroups <= group ) {
174 fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
178 if( (rrg = rte->rrgroups[group]) == NULL ) {
179 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
180 fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
183 memset( rrg, 0, sizeof( *rrg ) );
185 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
186 fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
189 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
191 rte->rrgroups[group] = rrg;
193 rrg->ep_idx = 0; // next endpoint to send to
194 rrg->nused = 0; // number populated
195 rrg->nendpts = MAX_EP_GROUP; // number allocated
198 ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
201 if( rrg->nused >= rrg->nendpts ) {
202 // future: reallocate
203 fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
207 rrg->epts[rrg->nused] = ep;
211 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
217 Given a name, find the nano socket needed to send to it. Returns the socket via
218 the user pointer passed in and sets the return value to true (1). If the
219 endpoint cannot be found false (0) is returned.
221 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
229 ep = rmr_sym_get( rt->hash, ep_name, 1 );
231 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
232 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
237 if( ! ep->open ) { // not open -- connect now
238 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
239 if( ep->addr == NULL ) { // name didn't resolve before, try again
240 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
242 if( uta_link2( ep ) ) { // find entry in table and create link
245 *nn_sock = ep->nn_sock; // pass socket back to caller
247 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
249 *nn_sock = ep->nn_sock;
257 Make a round robin selection within a round robin group for a route table
258 entry. Returns the nanomsg socket if there is a rte for the message
259 key, and group is defined. Socket is returned via pointer in the parm
262 The group is the group number to select from.
264 The user supplied (via pointer to) integer 'more' will be set if there are
265 additional groups beyond the one selected. This allows the caller to
266 to easily iterate over the group list -- more is set when the group should
267 be incremented and the function invoked again. Groups start at 0.
269 The return value is true (>0) if the socket was found and *nn_sock was updated
270 and false (0) if there is no associated socket for the msg type, group combination.
271 We return the index+1 from the round robin table on success so that we can verify
272 during test that different entries are being seleted; we cannot depend on the nng
273 socket being different as we could with nano.
275 NOTE: The round robin selection index increment might collide with other
276 threads if multiple threads are attempting to send to the same round
277 robin group; the consequences are small and avoid locking. The only side
278 effect is either sending two messages in a row to, or skipping, an endpoint.
279 Both of these, in the grand scheme of things, is minor compared to the
280 overhead of grabbing a lock on each call.
282 static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
283 //rtable_ent_t* rte; // matching rt entry
284 endpoint_t* ep; // seected end point
285 int state = FALSE; // processing state
291 if( ! more ) { // eliminate cheks each time we need to use
295 if( ! nn_sock ) { // user didn't supply a pointer
306 if( group < 0 || group >= rte->nrrgroups ) {
307 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups );
312 if( (rrg = rte->rrgroups[group]) == NULL ) {
313 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group );
314 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
318 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
320 switch( rrg->nused ) {
321 case 0: // nothing allocated, just punt
322 //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
325 case 1: // exactly one, no rr to deal with
327 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
331 default: // need to pick one and adjust rr counts
333 idx = rrg->ep_idx++ % rrg->nused; // see note above
334 ep = rrg->epts[idx]; // select next endpoint
335 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
336 state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
340 if( state ) { // end point selected, open if not, get socket either way
341 if( ! ep->open ) { // not connected
342 if( ep->addr == NULL ) { // name didn't resolve before, try again
343 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
346 if( uta_link2( ep ) ) { // find entry in table and create link
348 *nn_sock = ep->nn_sock; // pass socket back to caller
352 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
354 *nn_sock = ep->nn_sock;
362 Finds the rtable entry which matches the key. Returns a nil pointer if
363 no entry is found. If try_alternate is set, then we will attempt
364 to find the entry with a key based only on the message type.
366 static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
367 uint64_t key; // key is sub id and mtype banged together
368 rtable_ent_t* rte; // the entry we found
370 if( rt == NULL || rt->hash == NULL ) {
374 key = build_rt_key( sid, mtype ); // first try with a 'full' key
375 if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL) || ! try_alt ) { // found or not allowed to try the alternate, return what we have
379 if( sid != UNSET_SUBID ) { // not found, and allowed to try alternate; and the sub_id was set
380 key = build_rt_key( UNSET_SUBID, mtype ); // rebuild key
381 rte = rmr_sym_pull( rt->hash, key ); // see what we get with this