1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: rtable_static.c
23 Abstract: Route table management functions.
24 Author: E. Scott Daniels
25 Date: 29 November 2018
28 #ifndef rtable_static_c
29 #define rtable_static_c
38 #include <sys/types.h>
45 Establish a TCP connection to the indicated target (IP address).
46 Target assumed to be address:port. Requires a separate nano socket;
47 the socket number (for future sends) is returned or -1 on error.
49 static int uta_link2( char* target ) {
50 char conn_info[NN_SOCKADDR_MAX]; // string to give to nano to make the connection
51 int nn_sock; // the nano socket for this link
54 if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port
55 fprintf( stderr, "[INFO] rmr: rmr_link2: unable to create link: invalid target: %s\n", target == NULL ? "<nil>" : target );
59 nn_sock = nn_socket( AF_SP, NN_PUSH ); // the socket we'll use to connect to the target
61 fprintf( stderr, "[WRN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
65 snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
66 if( nn_connect( nn_sock, conn_info ) < 0 ) { // connect failed
67 fprintf( stderr, "[WRN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
76 This provides a protocol independent mechanism for establishing the connection to an endpoint.
77 Returns true on success; false otherwise.
79 static int rt_link2_ep( endpoint_t* ep ) {
88 ep->nn_sock = uta_link2( ep->addr ) >= 0; // open if a valid socket returned
89 ep->open = ep->nn_sock >= 0;
94 Add an endpoint to a route table entry for the group given. If the endpoint isn't in the
95 hash we add it and create the endpoint struct.
97 The caller must supply the specific route table (we assume it will be pending, but they
98 could live on the edge and update the active one, though that's not at all a good idea).
100 static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ) {
102 rrgroup_t* rrg; // pointer at group to update
104 if( ! rte || ! rt ) {
105 fprintf( stderr, "[WRN] rmr_add_ep didn't get a valid rt and/or rte pointer\n" );
109 if( rte->nrrgroups <= group ) {
110 fprintf( stderr, "[WRN] rmr_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
114 if( (rrg = rte->rrgroups[group]) == NULL ) {
115 if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
116 fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
119 memset( rrg, 0, sizeof( *rrg ) );
121 if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
122 fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
125 memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
127 rte->rrgroups[group] = rrg;
129 rrg->ep_idx = 0; // next to send to
130 rrg->nused = 0; // number populated
131 rrg->nendpts = MAX_EP_GROUP; // number allocated
134 if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
135 if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
136 fprintf( stderr, "uta: [WRN] malloc failed for endpoint creation: %s\n", ep_name );
140 ep->nn_sock = -1; // not connected
142 ep->addr = uta_h2ip( ep_name );
143 ep->name = strdup( ep_name );
145 rmr_sym_put( rt->hash, ep_name, 1, ep );
149 if( rrg->nused >= rrg->nendpts ) {
150 // future: reallocate
151 fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
155 rrg->epts[rrg->nused] = ep;
159 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name );
164 Given a name, find the nano socket needed to send to it. Returns the socket number if
167 static int uta_epsock_byname( route_table_t* rt, char* ep_name ) {
174 ep = rmr_sym_get( rt->hash, ep_name, 1 );
176 if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
181 if( !ep->open ) { // not connected; must connect now
182 if( ep->addr == NULL ) { // name didn't resolve before, try again
183 ep->addr = uta_h2ip( ep->name );
185 ep->nn_sock = uta_link2( ep->addr );
186 ep->open = ep->nn_sock >= 0;
187 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", ep->nn_sock >= 0 ? "[OK]" : "[FAIL]", ep->name );
194 Make a round robin selection within a round robin group for a route table
195 entry. Returns the nanomsg socket number if there is a rte for the message
196 type, and group is defined, else returns -1.
198 The group is the group number to select from.
200 The user supplied integer 'more' will be set if there are additional groups
201 defined to the matching route table entry which have a higher group number.
202 This assumes the caller is making a sequential pass across groups starting
203 with group 0. If more is set, the caller may increase the group number and
204 invoke this function again to make a selection against that group. If there
205 are no more groups, more is set to 0.
207 NOTE: The round robin selection index increment might collide with other
208 threads if multiple threads are attempting to send to the same round
209 robin group; the consequences are small and avoid locking. The only side
210 effect is either sending two messages in a row to, or skipping, an endpoint.
211 Both of these, in the grand scheme of things, is minor compared to the
212 overhead of grabbing a lock on each call.
214 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
215 rtable_ent_t* rte; // matching rt entry
216 endpoint_t* ep; // seected end point
223 if( ! more ) { // eliminate checks each time we need to use
232 if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
234 //if( DEBUG ) fprintf( stderr, "#### >>> rte not found for type key=%lu\n", key );
238 if( group < 0 || group >= rte->nrrgroups ) {
239 //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
244 if( (rrg = rte->rrgroups[group]) == NULL ) {
245 //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %lu\n", key );
246 *more = 0; // groups are inserted contig, so nothing should be after a nil pointer
250 *more = group < rte->nrrgroups-1 ? (rte->rrgroups[group+1] != NULL): 0; // more if something in next group slot
252 switch( rrg->nused ) {
253 case 0: // nothing allocated, just punt
257 case 1: // exactly one, no rr to deal with
258 nn_sock = rrg->epts[0]->nn_sock;
262 default: // need to pick one and adjust rr counts
263 idx = rrg->ep_idx++ % rrg->nused; // see note above
265 nn_sock = ep->nn_sock;
269 if( ep && ! ep->open ) { // not connected
270 if( ep->addr == NULL ) { // name didn't resolve before, try again
271 ep->addr = uta_h2ip( ep->name );
273 ep->nn_sock = nn_sock = uta_link2( ep->addr );
274 ep->open = ep->nn_sock >= 0;
275 if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection state to %s: %s\n", ep->name, nn_sock >= 0 ? "[OK]" : "[FAIL]" );