// vim: ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019-2020 Nokia
- Copyright (c) 2018-2020 AT&T Intellectual Property.
+ Copyright (c) 2019-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
-
- if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
- ver = 1;
- v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
- } else {
- ver = ntohl( v1hdr->rmr_ver );
- }
+ ver = ntohl( v1hdr->rmr_ver );
switch( ver ) {
- case 1:
- msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
- msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
- msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
+ // version 1 is deprecated case 1:
+ // version 2 is deprecated case 2:
- msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
- msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
- msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
- msg->state = RMR_OK;
- hlen = sizeof( uta_v1mhdr_t );
- break;
+ case 3:
+ // fall-through
default: // current version always lands here
hdr = (uta_mhdr_t *) msg->header;
nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
switch( ntohl( v1hdr->rmr_ver ) ) {
- case 1:
- hdr = nm->header;
- memcpy( hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
- nm->payload = (void *) v1hdr + sizeof( *v1hdr );
- break;
+ // version 1 deprecated case 1:
+ // version 2 deprecated
+ case 3:
+ // fall-through
default: // current message always caught here
hdr = nm->header;
memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
switch( ntohl( v1hdr->rmr_ver ) ) {
- case 1:
- v1hdr = nm->header;
- memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
- nm->payload = (void *) v1hdr + sizeof( *v1hdr );
- break;
+ // version 1 not supported
+ // version 2 not supported
+ case 3:
+ // fall-through
default: // current message version always caught here
hdr = nm->header;
nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
SET_HDR_LEN( nm->header );
- if( copy ) { // if we need to copy the old payload too
+ if( copy != 0 ) { // if we need to copy the old payload too
memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
} else { // just need to copy header
return nm;
}
-/*
- For SI95 based transport all receives are driven through the threaded
- ring and thus this function should NOT be called. If it is we will panic
- and abort straight away.
-*/
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
-
-fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
-exit( 1 );
-
- return NULL;
-}
-
/*
This does the hard work of actually sending the message to the given socket. On success,
a new message struct is returned. On error, the original msg is returned with the state
rmr_free_msg( msg ); // not wanting a meessage back, trash this one
return NULL;
}
- } else { // send failed -- return original message
- if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
+ } else { // send failed or would block -- return original message
+ if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
errno = EAGAIN;
- msg->state = RMR_ERR_RETRY; // errno will have nano reason
+ msg->state = RMR_ERR_RETRY;
} else {
+ rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
msg->state = RMR_ERR_SENDFAILED;
}
int sock_ok; // got a valid socket from round robin select
char* d1;
int ok_sends = 0; // track number of ok sends
+ route_table_t* rt; // active route table
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
max_to = ctx->send_retries; // convert to retries
}
- if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
+ rt = get_rt( ctx ); // get active route table and up ref count
+ if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
+ release_rt( ctx, rt );
rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
} else {
- sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+ sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
send_again = 0;
}
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( clone_m == NULL ) {
+ release_rt( ctx, rt );
msg->state = RMR_ERR_SENDFAILED;
errno = ENOMEM;
msg->tp_state = errno;
}
}
- if( ep != NULL && msg != NULL ) {
- switch( msg->state ) {
- case RMR_OK:
- ep->scounts[EPSC_GOOD]++;
- break;
-
- case RMR_ERR_RETRY:
- ep->scounts[EPSC_TRANS]++;
- break;
-
- default:
- ep->scounts[EPSC_FAIL]++;
- uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
- break;
- }
+ if( msg != NULL ) {
+ incr_ep_counts( msg->state, ep );
}
} else {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
}
}
+ release_rt( ctx, rt ); // we can safely dec the ref counter now
+
if( msg ) { // call functions don't get a buffer back, so a nil check is required
msg->flags &= ~MFL_NOALLOC; // must return with this flag off
if( ok_sends ) { // multiple rr-groups and one was successful; report ok