1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_nng_static.c
23 Abstract: Route table management functions which depend on the underlying
24 transport mechanism and thus cannot be included with the generic
25 route table functions.
27 This module is designed to be included by any module (main) needing
28 the static/private stuff.
30 Author: E. Scott Daniels
31 Date: 29 November 2018
34 #ifndef rtable_static_c
35 #define rtable_static_c
44 #include <sys/types.h>
49 // -----------------------------------------------------------------------------------------------------
52 Establish a TCP connection to the indicated target (IP address).
53 Target assumed to be address:port. The new socket is returned via the
54 user supplied pointer so that a success/fail code is returned directly.
55 Return value is 0 (false) on failure, 1 (true) on success.
57 In order to support multi-threaded user applications we must hold a lock before
58 we attempt to create a dialer and connect. NNG is thread safe, but we can
59 get things into a bad state if we allow a collision here. The lock grab
60 only happens on the intial session setup.
62 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
63 static int uta_link2( endpoint_t* ep ) {
67 char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
76 nn_sock = &ep->nn_sock;
79 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
80 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
84 if( nn_sock == NULL ) {
89 pthread_mutex_lock( &ep->gate ); // grab the lock
91 pthread_mutex_unlock( &ep->gate );
96 if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
97 pthread_mutex_unlock( &ep->gate );
98 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
102 snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
103 if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
104 pthread_mutex_unlock( &ep->gate );
105 fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
106 nng_close( *nn_sock );
110 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s)
111 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
113 if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg)
114 pthread_mutex_unlock( &ep->gate );
115 fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
116 nng_close( *nn_sock );
120 if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
122 ep->open = TRUE; // must set before release
123 pthread_mutex_unlock( &ep->gate );
128 This provides a protocol independent mechanism for establishing the connection to an endpoint.
129 Return is true (1) if the link was opened; false on error.
131 static int rt_link2_ep( endpoint_t* ep ) {
136 if( ep->open ) { // already open, do nothing
146 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
147 hash we add it and create the endpoint struct.
149 The caller must supply the specific route table (we assume it will be pending, but they
150 could live on the edge and update the active one, though that's not at all a good idea).
152 extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
154 rrgroup_t* rrg; // pointer at group to update
156 if( ! rte || ! rt ) {
157 fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
161 if( rte->nrrgroups <= group ) {
162 fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
166 if( (rrg = rte->rrgroups[group]) == NULL ) {
167 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
168 fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
171 memset( rrg, 0, sizeof( *rrg ) );
173 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
174 fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
177 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
179 rte->rrgroups[group] = rrg;
181 rrg->ep_idx = 0; // next endpoint to send to
182 rrg->nused = 0; // number populated
183 rrg->nendpts = MAX_EP_GROUP; // number allocated
186 ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
189 if( rrg->nused >= rrg->nendpts ) {
190 // future: reallocate
191 fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
195 rrg->epts[rrg->nused] = ep;
199 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
205 Given a name, find the nano socket needed to send to it. Returns the socket via
206 the user pointer passed in and sets the return value to true (1). If the
207 endpoint cannot be found false (0) is returned.
209 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
217 ep = rmr_sym_get( rt->hash, ep_name, 1 );
219 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
220 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
225 if( ! ep->open ) { // not open -- connect now
226 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
227 if( ep->addr == NULL ) { // name didn't resolve before, try again
228 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
230 if( uta_link2( ep ) ) { // find entry in table and create link
233 *nn_sock = ep->nn_sock; // pass socket back to caller
235 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
237 *nn_sock = ep->nn_sock;
245 Make a round robin selection within a round robin group for a route table
246 entry. Returns the nanomsg socket if there is a rte for the message
247 type, and group is defined. Socket is returned via pointer in the parm
250 The group is the group number to select from.
252 The user supplied (via pointer to) integer 'more' will be set if there are
253 additional groups beyond the one selected. This allows the caller to
254 to easily iterate over the group list -- more is set when the group should
255 be incremented and the function invoked again. Groups start at 0.
257 The return value is true (>0) if the socket was found and *nn_sock was updated
258 and false (0) if there is no associated socket for the msg type, group combination.
259 We return the index+1 from the round robin table on success so that we can verify
260 during test that different entries are being seleted; we cannot depend on the nng
261 socket being different as we could with nano.
263 NOTE: The round robin selection index increment might collide with other
264 threads if multiple threads are attempting to send to the same round
265 robin group; the consequences are small and avoid locking. The only side
266 effect is either sending two messages in a row to, or skipping, an endpoint.
267 Both of these, in the grand scheme of things, is minor compared to the
268 overhead of grabbing a lock on each call.
270 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
271 rtable_ent_t* rte; // matching rt entry
272 endpoint_t* ep; // seected end point
273 int state = FALSE; // processing state
279 if( ! more ) { // eliminate cheks each time we need to use
283 if( ! nn_sock ) { // user didn't supply a pointer
294 if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
296 //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key );
300 if( group < 0 || group >= rte->nrrgroups ) {
301 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
306 if( (rrg = rte->rrgroups[group]) == NULL ) {
307 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key );
308 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
312 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
314 switch( rrg->nused ) {
315 case 0: // nothing allocated, just punt
316 //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
319 case 1: // exactly one, no rr to deal with and more is not possible even if fanout > 1
320 //*nn_sock = rrg->epts[0]->nn_sock;
322 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
326 default: // need to pick one and adjust rr counts
328 idx = rrg->ep_idx++ % rrg->nused; // see note above
329 ep = rrg->epts[idx]; // select next endpoint
330 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
331 state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE
335 if( state ) { // end point selected, open if not, get socket either way
336 if( ! ep->open ) { // not connected
337 if( ep->addr == NULL ) { // name didn't resolve before, try again
338 ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
341 if( uta_link2( ep ) ) { // find entry in table and create link
343 *nn_sock = ep->nn_sock; // pass socket back to caller
347 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
349 *nn_sock = ep->nn_sock;