nn_sock = nn_socket( AF_SP, NN_PUSH ); // the socket we'll use to connect to the target
if( nn_sock < 0 ) {
- fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
+ fprintf( stderr, "[WRN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
return -1;
}
snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
if( nn_connect( nn_sock, conn_info ) < 0 ) { // connect failed
- fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
+ fprintf( stderr, "[WRN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
nn_close( nn_sock );
return -1;
}
rrgroup_t* rrg; // pointer at group to update
if( ! rte || ! rt ) {
- fprintf( stderr, "[WARN] rmr_add_ep didn't get a valid rt and/or rte pointer\n" );
+ fprintf( stderr, "[WRN] rmr_add_ep didn't get a valid rt and/or rte pointer\n" );
return NULL;
}
if( rte->nrrgroups <= group ) {
- fprintf( stderr, "[WARN] rmr_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
+ fprintf( stderr, "[WRN] rmr_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups );
return NULL;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) {
- fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
+ fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group );
return NULL;
}
memset( rrg, 0, sizeof( *rrg ) );
if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) {
- fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
+ fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group );
return NULL;
}
memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP );
if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
- fprintf( stderr, "uta: [WARN] malloc failed for endpoint creation: %s\n", ep_name );
+ fprintf( stderr, "uta: [WRN] malloc failed for endpoint creation: %s\n", ep_name );
return NULL;
}
if( rrg != NULL ) {
if( rrg->nused >= rrg->nendpts ) {
// future: reallocate
- fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
+ fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group );
return NULL;
}
with group 0. If more is set, the caller may increase the group number and
invoke this function again to make a selection against that group. If there
are no more groups, more is set to 0.
+
+ NOTE: The round robin selection index increment might collide with other
+ threads if multiple threads are attempting to send to the same round
+ robin group; the consequences are small and avoid locking. The only side
+ effect is either sending two messages in a row to, or skipping, an endpoint.
+ Both of these, in the grand scheme of things, is minor compared to the
+ overhead of grabbing a lock on each call.
*/
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
rtable_ent_t* rte; // matching rt entry
int nn_sock = -2;
int dummy;
rrgroup_t* rrg;
+ int idx;
if( ! more ) { // eliminate checks each time we need to use
break;
default: // need to pick one and adjust rr counts
- ep = rrg->epts[rrg->ep_idx];
- nn_sock = rrg->epts[rrg->ep_idx++]->nn_sock;
- if( rrg->ep_idx >= rrg->nused ) {
- rrg->ep_idx = 0;
- }
+ idx = rrg->ep_idx++ % rrg->nused; // see note above
+ ep = rrg->epts[idx];
+ nn_sock = ep->nn_sock;
break;
}