X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnng%2Fsrc%2Fsr_nng_static.c;fp=src%2Fnng%2Fsrc%2Fsr_nng_static.c;h=369c68ec43709417aefc17a13144070fda28c501;hb=68c5cf1104e89f5c43786a3e48f5c6a1e757f59f;hp=cfea8291f3fcaea569b5329e554eb41d9a6a7d8d;hpb=d52954976038217d6067bc6e8e83d0ca882f9b6b;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nng/src/sr_nng_static.c b/src/nng/src/sr_nng_static.c index cfea829..369c68e 100644 --- a/src/nng/src/sr_nng_static.c +++ b/src/nng/src/sr_nng_static.c @@ -107,10 +107,10 @@ static inline int xlate_nng_state( int state, int def_state ) { nng messages. */ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) { - size_t mlen; + size_t mlen; // size of the transport buffer that we'll allocate uta_mhdr_t* hdr; // convenience pointer - mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer + mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init if( msg == NULL ) { @@ -131,12 +131,17 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s } msg->header = nng_msg_body( msg->tp_buf ); + memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) { - hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday + hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version + SET_HDR_LEN( hdr ); // ensure these are converted to net byte order + SET_HDR_TR_LEN( hdr, ctx->trace_data_len ); + //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them + //SET_HDR_D2_LEN( hdr, ctx->d2_len ); } msg->len = 0; // length of data in the payload - msg->alloc_len = mlen; // length of allocated payload - msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) + msg->alloc_len = mlen; // length of allocated transport buffer + msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message @@ -185,21 +190,59 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) { The alen parm is the assumed allocated length; assumed because it's a value likely to have come from nng receive and the actual alloc len might be larger, but we can only assume this is the total usable space. + + This function returns the message with an error state set if it detects that the + received message might have been truncated. Check is done here as the calculation + is somewhat based on header version. */ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) { - uta_mhdr_t* hdr; + uta_mhdr_t* hdr; // current header + uta_v1mhdr_t* v1hdr; // version 1 header + int ver; + int hlen; // header len to use for a truncation check msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer + v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version - hdr = (uta_mhdr_t *) msg->header; - hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday - msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger) - msg->alloc_len = alen; // length of whole tp buffer (including header) - msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) - msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area - msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message - msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order - msg->state = RMR_OK; + if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order + ver = 1; + v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message + } else { + ver = ntohl( v1hdr->rmr_ver ); + } + + switch( ver ) { + case 1: + msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger) + msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits) + msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above) + + msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order + msg->state = RMR_OK; + hlen = sizeof( uta_v1mhdr_t ); + break; + + default: // current version always lands here + hdr = (uta_mhdr_t *) msg->header; + msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger) + msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits) + + msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits + msg->xaction = &hdr->xid[0]; // point at transaction id in header area + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order + hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later + break; + } + + if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error + msg->state = RMR_ERR_TRUNC; + msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun + } else { + msg->state = RMR_OK; + } } /* @@ -209,6 +252,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { rmr_mbuf_t* nm; // new message buffer size_t mlen; int state; + uta_mhdr_t* hdr; + uta_v1mhdr_t* v1hdr; nm = (rmr_mbuf_t *) malloc( sizeof *nm ); if( nm == NULL ) { @@ -223,16 +268,29 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { exit( 1 ); } - nm->header = nng_msg_body( nm->tp_buf ); + nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message + v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version + switch( ntohl( v1hdr->rmr_ver ) ) { + case 1: + memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header + nm->payload = (void *) v1hdr + sizeof( *v1hdr ); + break; + + default: // current message always caught here + hdr = nm->header; + memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data + nm->payload = nm->header + PAYLOAD_OFFSET( hdr ); // point at the payload + break; + } + + // --- these are all version agnostic ----------------------------------- nm->mtype = old_msg->mtype; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload - nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) - nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area + + nm->xaction = hdr->xid; // reference xaction nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message - - memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID ); memcpy( nm->payload, old_msg->payload, old_msg->len ); return nm; @@ -276,16 +334,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->tp_buf = NULL; } else { - //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer } + msg->alloc_len = 0; msg->len = 0; msg->payload = NULL; msg->xaction = NULL; - //rsize = msg->alloc_len; // set to max, and we'll get len back here too - //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS ); // total space (header + payload len) allocated msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) { return msg; @@ -297,20 +353,22 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { } rsize = nng_msg_len( msg->tp_buf ); - if( rsize >= sizeof( uta_mhdr_t ) ) { // we need at least a full header here - - ref_tpbuf( msg, rsize ); // point payload, header etc to the just received tp buffer + if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here + ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed hdr = (uta_mhdr_t *) msg->header; - msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src - if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) { // way more than we should have had room for; error - msg->state = RMR_ERR_TRUNC; - } + msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header ); } else { - msg->len = 0; msg->state = RMR_ERR_EMPTY; + msg->len = 0; + msg->alloc_len = rsize; + msg->payload = NULL; + msg->xaction = NULL; + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->mtype = -1; } return msg; @@ -386,8 +444,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock errno = 0; msg->state = RMR_OK; if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer - //nng_flags |= NNG_FLAG_ALLOC; // indicate a zc buffer that nng is expected to free - do { if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure msg->state = state; @@ -409,6 +465,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock } } while( state && retries > 0 ); } else { + // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else msg->state = RMR_ERR_SENDFAILED; errno = ENOTSUP; return msg; @@ -434,8 +491,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock msg->state = RMR_ERR_RETRY; // errno will have nano reason } else { msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno - //errno = -msg->state; - //msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason } if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );