// vim: ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019-2020 Nokia
- Copyright (c) 2018-2020 AT&T Intellectual Property.
+ Copyright (c) 2019-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
blen++;
*blen = htonl( len ); // new systems want a converted integer
+ memset( &buf[TP_SZFIELD_LEN], 0, 4 ); // clear to prevent future conversion issues
buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER; // marker to flag this is generated by a new message
}
msg->state = state; // fill in caller's state (likely the state of the last operation)
msg->flags = MFL_ZEROCOPY; // this is a zerocopy sendable message
msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
switch( ntohl( v1hdr->rmr_ver ) ) {
case 1:
- memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
+ hdr = nm->header;
+ memcpy( hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
nm->payload = (void *) v1hdr + sizeof( *v1hdr );
break;
v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
switch( ntohl( v1hdr->rmr_ver ) ) {
case 1:
+ v1hdr = nm->header;
memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
nm->payload = (void *) v1hdr + sizeof( *v1hdr );
break;
nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
SET_HDR_LEN( nm->header );
- if( copy ) { // if we need to copy the old payload too
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+ if( copy != 0 ) { // if we need to copy the old payload too
memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
} else { // just need to copy header
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
}
ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
}
if( retries == 0 ) {
rmr_free_msg( msg ); // not wanting a meessage back, trash this one
return NULL;
}
- } else { // send failed -- return original message
- if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
+ } else { // send failed or would block -- return original message
+ if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
errno = EAGAIN;
- msg->state = RMR_ERR_RETRY; // errno will have nano reason
+ msg->state = RMR_ERR_RETRY;
} else {
+ rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
msg->state = RMR_ERR_SENDFAILED;
}
int sock_ok; // got a valid socket from round robin select
char* d1;
int ok_sends = 0; // track number of ok sends
+ route_table_t* rt; // active route table
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
max_to = ctx->send_retries; // convert to retries
}
- if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
+ rt = get_rt( ctx ); // get active route table and up ref count
+ if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
+ release_rt( ctx, rt );
rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
} else {
- sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+ sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
send_again = 0;
}
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( clone_m == NULL ) {
+ release_rt( ctx, rt );
msg->state = RMR_ERR_SENDFAILED;
errno = ENOMEM;
msg->tp_state = errno;
} else {
ok_sends++;
msg = clone_m; // clone will be the next to send
+ msg->state = RMR_OK;
}
} else {
msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
case RMR_OK:
ep->scounts[EPSC_GOOD]++;
break;
-
+
case RMR_ERR_RETRY:
ep->scounts[EPSC_TRANS]++;
break;
}
}
+ release_rt( ctx, rt ); // we can safely dec the ref counter now
+
if( msg ) { // call functions don't get a buffer back, so a nil check is required
msg->flags &= ~MFL_NOALLOC; // must return with this flag off
if( ok_sends ) { // multiple rr-groups and one was successful; report ok