feat(API): Add trace data functions
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
index 369c68e..3592a7b 100644 (file)
@@ -37,7 +37,6 @@
 #include <nng/protocol/pipeline0/push.h>
 #include <nng/protocol/pipeline0/pull.h>
 
-
 /*
        Translates the nng state passed in to one of ours that is suitable to put
        into the message, and sets errno to something that might be useful.
@@ -102,16 +101,22 @@ static inline int xlate_nng_state( int state, int def_state ) {
        including our header). If size is 0, then the buffer allocated is the size previously
        allocated (if msg is !nil) or the default size given at initialisation).
 
+       The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
+       the context value is used.
+
        NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
                are zero copy, however ONLY the message is zero copy. We now allocate and use
                nng messages.
 */
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
        size_t          mlen;                   // size of the transport buffer that we'll allocate
        uta_mhdr_t*     hdr;                    // convenience pointer
+       int                     tr_len;                 // trace data len (default or override)
 
-       mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len;  // start with header and trace/data lengths
-       mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
+
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
 
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
@@ -141,7 +146,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
-       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
@@ -213,7 +218,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
 
        switch( ver ) {
                case 1:
-                       msg->len = ntohl( v1hdr->plen );                                                        // length sender says is in the payload (received length could be larger)
+                       msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
                        msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
                        msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
 
@@ -229,7 +234,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
                        msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
                        msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
 
-                       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
+                       msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
                        msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
@@ -278,8 +283,80 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
                default:                                                                                        // current message always caught  here
                        hdr = nm->header;
-                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) );         // copy complete header, trace and other data
-                       nm->payload = nm->header + PAYLOAD_OFFSET( hdr );               // point at the payload
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
+                       nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
+                       break;
+       }
+               
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
+/*
+       This will clone a message with a change to the trace area in the header such that
+       it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
+       The orignal message will be left unchanged, and a pointer to the new message is returned.
+       It is not possible to realloc buffers and change the data sizes.
+*/
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+       int     coffset;                        // an offset to something in the header for copy
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               exit( 1 );
+       }
+
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
+                       if( RMR_D1_LEN( hdr )  ) {
+                               coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
+                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
+                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
+                       }
+
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
                        break;
        }
                
@@ -358,7 +435,6 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                hdr = (uta_mhdr_t *) msg->header;
                msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
 
-
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
@@ -391,7 +467,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
@@ -430,12 +506,14 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        uta_mhdr_t*     hdr;
        int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
        int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
+       int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
        hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
        hdr->plen = htonl( msg->len );
+       tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
@@ -462,6 +540,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                                msg->state = RMR_OK;
                                msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
                                msg->tp_buf = NULL;
+                               hdr = NULL;
                        }
                } while( state && retries > 0 );
        } else {
@@ -478,9 +557,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                */
        }
 
-       if( msg->state == RMR_OK ) {                                                            // successful send
-               if( !(msg->flags & MFL_NOALLOC) ) {                             // allocate another sendable zc buffer unless told otherwise
-                       return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
+       if( msg->state == RMR_OK ) {                                                                    // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
                } else {
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;