feat(msgs): Add header v2 support
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
index cfea829..369c68e 100644 (file)
@@ -107,10 +107,10 @@ static inline int xlate_nng_state( int state, int def_state ) {
                nng messages.
 */
 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
-       size_t          mlen;
+       size_t          mlen;                   // size of the transport buffer that we'll allocate
        uta_mhdr_t*     hdr;                    // convenience pointer
 
-       mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
+       mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len;  // start with header and trace/data lengths
        mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
 
        if( msg == NULL ) {
@@ -131,12 +131,17 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
 
        msg->header = nng_msg_body( msg->tp_buf );
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
        if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
-               hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
+               hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
+               SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
+               SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+               //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
+               //SET_HDR_D2_LEN( hdr, ctx->d2_len );
        }
        msg->len = 0;                                                                                   // length of data in the payload
-       msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
+       msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
+       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
@@ -185,21 +190,59 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
        The alen parm is the assumed allocated length; assumed because it's a value likely
        to have come from nng receive and the actual alloc len might be larger, but we
        can only assume this is the total usable space.
+
+       This function returns the message with an error state set if it detects that the
+       received message might have been truncated.  Check is done here as the calculation
+       is somewhat based on header version.
 */
 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
-       uta_mhdr_t* hdr;
+       uta_mhdr_t* hdr;                                // current header
+       uta_v1mhdr_t* v1hdr;                    // version 1 header
+       int ver;
+       int     hlen;                                           // header len to use for a truncation check
 
        msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
+       v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
 
-       hdr = (uta_mhdr_t *) msg->header;
-       hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
-       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
-       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header)
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
-       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
-       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
-       msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
-       msg->state = RMR_OK;
+       if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order 
+               ver = 1;
+               v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
+       } else {
+               ver = ntohl( v1hdr->rmr_ver );
+       }
+
+       switch( ver ) {
+               case 1:
+                       msg->len = ntohl( v1hdr->plen );                                                        // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+                       msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
+
+                       msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
+                       msg->state = RMR_OK;
+                       hlen = sizeof( uta_v1mhdr_t );
+                       break;
+
+               default:                                                                                                        // current version always lands here
+                       hdr = (uta_mhdr_t *) msg->header;
+                       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+
+                       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
+                       msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
+                       hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
+                       break;
+       }
+
+       if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
+               msg->state = RMR_ERR_TRUNC;
+               msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
+       } else {
+               msg->state = RMR_OK;
+       }
 }
 
 /*
@@ -209,6 +252,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        rmr_mbuf_t* nm;                 // new message buffer
        size_t  mlen;
        int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
@@ -223,16 +268,29 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                exit( 1 );
        }
 
-       nm->header = nng_msg_body( nm->tp_buf );
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) );         // copy complete header, trace and other data
+                       nm->payload = nm->header + PAYLOAD_OFFSET( hdr );               // point at the payload
+                       break;
+       }
+               
+       // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
-       nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
-
-       memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
        memcpy( nm->payload, old_msg->payload, old_msg->len );
 
        return nm;
@@ -276,16 +334,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
                msg->tp_buf = NULL;
        } else {
-               //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                    // will abort on failure, no need to check
                msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
        }
 
+       msg->alloc_len = 0;
        msg->len = 0;
        msg->payload = NULL;
        msg->xaction = NULL;
 
-       //rsize = msg->alloc_len;                                                                                                               // set to max, and we'll get len back here too
-       //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS );         // total space (header + payload len) allocated
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
        if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
                return msg;
@@ -297,20 +353,22 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        }
 
        rsize = nng_msg_len( msg->tp_buf );
-       if( rsize >= sizeof( uta_mhdr_t ) ) {                                                           // we need at least a full header here
-
-               ref_tpbuf( msg, rsize );                                                // point payload, header etc to the just received tp buffer
+       if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
+               ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
                hdr = (uta_mhdr_t *) msg->header;
-               msg->flags |= MFL_ADDSRC;                               // turn on so if user app tries to send this buffer we reset src
-               if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) {              // way more than we should have had room for; error
-                       msg->state = RMR_ERR_TRUNC;
-               }
+               msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
+
 
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
-               msg->len = 0;
                msg->state = RMR_ERR_EMPTY;
+               msg->len = 0;
+               msg->alloc_len = rsize;
+               msg->payload = NULL;
+               msg->xaction = NULL;
+               msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
+               msg->mtype = -1;
        }
 
        return msg;
@@ -386,8 +444,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        errno = 0;
        msg->state = RMR_OK;
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
-               //nng_flags |= NNG_FLAG_ALLOC;                                                                  // indicate a zc buffer that nng is expected to free
-
                do {
                        if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
                                msg->state = state;
@@ -409,6 +465,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                        }
                } while( state && retries > 0 );
        } else {
+               // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
                msg->state = RMR_ERR_SENDFAILED;
                errno = ENOTSUP;
                return msg;
@@ -434,8 +491,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                        msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
                } else {
                        msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
-                       //errno = -msg->state;
-                       //msg->state = RMR_ERR_SENDFAILED;                                      // errno will have nano reason
                }
 
                if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );