feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
index cddb662..18acdda 100644 (file)
@@ -1,14 +1,14 @@
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
        a new message struct as well. Size is the size of the zc buffer to allocate (not
        including our header). If size is 0, then the buffer allocated is the size previously
        allocated (if msg is !nil) or the default size given at initialisation).
+
+
+       The trlo parm is the trace length override which will be used if not 0. If 0, then the
+       length in the context is used (default).
 */
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
        int     mlen;
+       uta_mhdr_t*     hdr;
+       int tr_len;                             // length to allocate for trace info
 
-       mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
-       mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
+
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
 
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
@@ -90,11 +98,22 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                exit( 1 );
        }
 
-       ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER;    // version info should we need to recognised old style messages someday
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                 // must ensure that header portion of tpbuf is 0
+       msg->tp_buf = msg->header;
+       hdr = (uta_mhdr_t *) msg->header;
+       hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
+       hdr->sub_id = htonl( UNSET_SUBID );
+       SET_HDR_LEN( hdr );
+       SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
+       //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
+       //SET_HDR_D2_LEN( hdr, ctx->d1_len );
+
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
-       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                                                // point at transaction id in header area
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
+       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
@@ -128,14 +147,80 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                exit( 1 );
        }
 
+       memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
+
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
+       nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
        nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
-       memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
+               fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
+               exit( 1 );
+       }
+
+       nm->tp_buf = nm->header;                                                                // in nano both are the same
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
+                       if( RMR_D1_LEN( hdr )  ) {
+                               memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       }
+
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
+                       break;
+       }
+
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
 
        return nm;
@@ -159,23 +244,24 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
        if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
                hdr = (uta_mhdr_t *) msg->header;
                msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
-               if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) {
-                       fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n", 
-                               msg->len, msg->state - sizeof( uta_mhdr_t ) );
+               if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
+                       msg->state = RMR_ERR_TRUNC;
+                       msg->len = msg->state - RMR_HDR_LEN( hdr );
                }
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
+               msg->sub_id = ntohl( hdr->sub_id );                                                             // capture and convert from network order to local order
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
-               msg->payload = msg->header + sizeof( uta_mhdr_t );
+               msg->payload = PAYLOAD_ADDR( msg->header );
                msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
+               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
                msg->len = 0;
@@ -188,7 +274,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
 /*
        Receives a 'raw' message from a non-RMr sender (no header expected). The returned
-       message buffer cannot be used to send, and the length information may or may 
+       message buffer cannot be used to send, and the length information may or may
        not be correct (it is set to the length received which might be more than the
        bytes actually in the payload).
 */
@@ -200,13 +286,14 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
        if( msg->state >= 0 ) {
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
                msg->len = msg->state;                                                                          // no header; len is the entire thing received
                msg->state = RMR_OK;
                msg->flags = MFL_RAW;                                                                           // prevent any sending of this headerless buffer
@@ -217,7 +304,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->state = RMR_ERR_EMPTY;
                msg->payload = NULL;
                msg->xaction = NULL;
-               msg->mtype = -1;
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
@@ -225,8 +313,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
 /*
        This does the hard work of actually sending the message to the given socket. On success,
-       a new message struct is returned. On error, the original msg is returned with the state 
-       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
+       a new message struct is returned. On error, the original msg is returned with the state
+       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
        buffer will not be allocated and returned (mostly for call() interal processing since
        the return message from call() is a received buffer, not a new one).
 
@@ -235,17 +323,21 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        int state;
        uta_mhdr_t*     hdr;
+       int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
+       //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
        }
 
+       tr_len = RMR_TR_LEN( hdr );
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
                if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
                        msg->state = state;
@@ -255,13 +347,13 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        } else {
                if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
                        msg->state = state;
-               } 
+               }
        }
 
        // future:  if nano sends bytes, but less than mlen, then what to do?
-       if( msg->state >= 0 ) {                                                         // successful send
-               if( !(msg->flags & MFL_NOALLOC) ) {                             // if noalloc is set, then caller doesn't want a new buffer
-                       return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
+       if( msg->state >= 0 ) {                                                                         // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
                } else {
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;