feat(API): Add subscription id and source retrieval
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
index cddb662..1c2bec8 100644 (file)
        a new message struct as well. Size is the size of the zc buffer to allocate (not
        including our header). If size is 0, then the buffer allocated is the size previously
        allocated (if msg is !nil) or the default size given at initialisation).
+
+
+       The trlo parm is the trace length override which will be used if not 0. If 0, then the
+       length in the context is used (default).
 */
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
        int     mlen;
+       uta_mhdr_t*     hdr;
+       int tr_len;                             // length to allocate for trace info
 
-       mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
-       mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
+
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
 
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
@@ -90,11 +98,18 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                exit( 1 );
        }
 
-       ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER;    // version info should we need to recognised old style messages someday
+       hdr = (uta_mhdr_t *) msg->header;
+       hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
+       hdr->sub_id = htonl( UNSET_SUBID );
+       SET_HDR_LEN( hdr );
+       SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
+       //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
+       //SET_HDR_D2_LEN( hdr, ctx->d1_len );
+
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
-       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                                                // point at transaction id in header area
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
+       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
@@ -128,14 +143,80 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                exit( 1 );
        }
 
+       memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
+
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
+       nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
        nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
-       memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
+               fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
+               exit( 1 );
+       }
+
+       nm->tp_buf = nm->header;                                                                // in nano both are the same
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
+                       if( RMR_D1_LEN( hdr )  ) {
+                               memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       }
+
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
+                       break;
+       }
+               
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
 
        return nm;
@@ -159,21 +240,22 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
        if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
                hdr = (uta_mhdr_t *) msg->header;
                msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
-               if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) {
-                       fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n", 
-                               msg->len, msg->state - sizeof( uta_mhdr_t ) );
+               if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
+                       msg->state = RMR_ERR_TRUNC;
+                       msg->len = msg->state - RMR_HDR_LEN( hdr );
                }
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
+               msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
-               msg->payload = msg->header + sizeof( uta_mhdr_t );
+               msg->payload = PAYLOAD_ADDR( msg->header );
                msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
@@ -200,13 +282,14 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
        if( msg->state >= 0 ) {
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
                msg->len = msg->state;                                                                          // no header; len is the entire thing received
                msg->state = RMR_OK;
                msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
@@ -217,7 +300,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->state = RMR_ERR_EMPTY;
                msg->payload = NULL;
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
@@ -235,17 +319,20 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        int state;
        uta_mhdr_t*     hdr;
+       int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
        }
 
+       tr_len = RMR_TR_LEN( hdr );
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
                if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
                        msg->state = state;
@@ -259,9 +346,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        }
 
        // future:  if nano sends bytes, but less than mlen, then what to do?
-       if( msg->state >= 0 ) {                                                         // successful send
-               if( !(msg->flags & MFL_NOALLOC) ) {                             // if noalloc is set, then caller doesn't want a new buffer
-                       return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
+       if( msg->state >= 0 ) {                                                                         // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
                } else {
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;