1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_nng_static.c
23 Abstract: Route table management functions which depend on the underlying
24 transport mechanism and thus cannot be included with the generic
25 route table functions.
27 This module is designed to be included by any module (main) needing
28 the static/private stuff.
30 Author: E. Scott Daniels
31 Date: 29 November 2018
34 #ifndef rtable_static_c
35 #define rtable_static_c
44 #include <sys/types.h>
49 // -----------------------------------------------------------------------------------------------------
52 Establish a TCP connection to the indicated target (IP address).
53 Target assumed to be address:port. The new socket is returned via the
54 user supplied pointer so that a success/fail code is returned directly.
55 Return value is 0 (false) on failure, 1 (true) on success.
57 static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) {
58 char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
62 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
63 fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "<nil>" : target );
67 if( nn_sock == NULL ) {
72 if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
73 fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target );
77 snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
78 if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) {
79 fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno );
80 nng_close( *nn_sock );
84 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s)
85 nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap
87 if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg)
88 fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) );
89 nng_close( *nn_sock );
93 if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target );
99 This provides a protocol independent mechanism for establishing the connection to an endpoint.
100 Return is true (1) if the link was opened; false on error.
102 static int rt_link2_ep( endpoint_t* ep ) {
107 if( ep->open ) { // already open, do nothing
111 ep->open = uta_link2( ep->addr, &ep->nn_sock, &ep->dialer );
117 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
118 hash we add it and create the endpoint struct.
120 The caller must supply the specific route table (we assume it will be pending, but they
121 could live on the edge and update the active one, though that's not at all a good idea).
123 extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
125 rrgroup_t* rrg; // pointer at group to update
127 if( ! rte || ! rt ) {
128 fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" );
132 if( rte->nrrgroups <= group ) {
133 fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
137 if( (rrg = rte->rrgroups[group]) == NULL ) {
138 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
139 fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
142 memset( rrg, 0, sizeof( *rrg ) );
144 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
145 fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
148 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
150 rte->rrgroups[group] = rrg;
152 rrg->ep_idx = 0; // next endpoint to send to
153 rrg->nused = 0; // number populated
154 rrg->nendpts = MAX_EP_GROUP; // number allocated
157 ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known
159 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
160 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
161 fprintf( stderr, "uta: [WARN] malloc failed for endpoint creation: %s\n", ep_name );
165 ep->open = 0; // not connected
166 ep->addr = uta_h2ip( ep_name );
167 ep->name = strdup( ep_name );
169 rmr_sym_put( rt->hash, ep_name, 1, ep );
174 if( rrg->nused >= rrg->nendpts ) {
175 // future: reallocate
176 fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
180 rrg->epts[rrg->nused] = ep;
184 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
190 Given a name, find the nano socket needed to send to it. Returns the socket via
191 the user pointer passed in and sets the return value to true (1). If the
192 endpoint cannot be found false (0) is returned.
194 static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
203 ep = rmr_sym_get( rt->hash, ep_name, 1 );
205 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
209 if( ! ep->open ) { // not open -- connect now
210 if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name );
211 if( ep->addr == NULL ) { // name didn't resolve before, try again
212 ep->addr = uta_h2ip( ep->name );
214 if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link
217 *nn_sock = ep->nn_sock; // pass socket back to caller
219 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name );
221 *nn_sock = ep->nn_sock;
229 Make a round robin selection within a round robin group for a route table
230 entry. Returns the nanomsg socket if there is a rte for the message
231 type, and group is defined. Socket is returned via pointer in the parm
234 The group is the group number to select from.
236 The user supplied (via pointer to) integer 'more' will be set if there are
237 additional groups beyond the one selected. This allows the caller to
238 to easily iterate over the group list -- more is set when the group should
239 be incremented and the function invoked again. Groups start at 0.
241 The return value is true (>0) if the socket was found and *nn_sock was updated
242 and false (0) if there is no associated socket for the msg type, group combination.
243 We return the index+1 from the round robin table on success so that we can verify
244 during test that different entries are being seleted; we cannot depend on the nng
245 socket being different as we could with nano.
247 static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) {
248 rtable_ent_t* rte; // matching rt entry
249 endpoint_t* ep; // seected end point
250 int state = FALSE; // processing state
255 if( ! more ) { // eliminate cheks each time we need to user
259 if( ! nn_sock ) { // user didn't supply a pointer
270 if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
272 //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
276 if( group < 0 || group >= rte->nrrgroups ) {
277 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
282 if( (rrg = rte->rrgroups[group]) == NULL ) {
283 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
284 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
288 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
290 switch( rrg->nused ) {
291 case 0: // nothing allocated, just punt
292 //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
295 case 1: // exactly one, no rr to deal with and more is not possible even if fanout > 1
296 //*nn_sock = rrg->epts[0]->nn_sock;
298 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" );
302 default: // need to pick one and adjust rr counts
303 ep = rrg->epts[rrg->ep_idx++]; // select next endpoint
304 //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
305 if( rrg->ep_idx >= rrg->nused ) {
308 state = rrg->ep_idx+1;
312 if( state ) { // end point selected, open if not, get socket either way
313 if( ! ep->open ) { // not connected
314 if( ep->addr == NULL ) { // name didn't resolve before, try again
315 ep->addr = uta_h2ip( ep->name );
318 if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link
320 *nn_sock = ep->nn_sock; // pass socket back to caller
324 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
326 *nn_sock = ep->nn_sock;