feat(routing): Support session based routing
[ric-plt/lib/rmr.git] / src / nanomsg / src / sr_static.c
index 1c2bec8..18acdda 100644 (file)
@@ -1,14 +1,14 @@
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
@@ -98,6 +98,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                exit( 1 );
        }
 
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                 // must ensure that header portion of tpbuf is 0
+       msg->tp_buf = msg->header;
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
        hdr->sub_id = htonl( UNSET_SUBID );
@@ -108,6 +110,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
@@ -197,7 +201,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                        memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
                        if( RMR_D1_LEN( hdr )  ) {
                                memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
-                       
+
                        }
                        if( RMR_D2_LEN( hdr )  ) {
                                memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
@@ -207,7 +211,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                        nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
                        break;
        }
-               
+
        // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
        nm->sub_id = old_msg->sub_id;
@@ -257,7 +261,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
                msg->payload = PAYLOAD_ADDR( msg->header );
                msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
+               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
                msg->len = 0;
@@ -270,7 +274,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
 /*
        Receives a 'raw' message from a non-RMr sender (no header expected). The returned
-       message buffer cannot be used to send, and the length information may or may 
+       message buffer cannot be used to send, and the length information may or may
        not be correct (it is set to the length received which might be more than the
        bytes actually in the payload).
 */
@@ -309,8 +313,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
 /*
        This does the hard work of actually sending the message to the given socket. On success,
-       a new message struct is returned. On error, the original msg is returned with the state 
-       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
+       a new message struct is returned. On error, the original msg is returned with the state
+       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
        buffer will not be allocated and returned (mostly for call() interal processing since
        the return message from call() is a received buffer, not a new one).
 
@@ -323,6 +327,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
+       //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
        hdr = (uta_mhdr_t *) msg->header;
        hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub-id in network byte order for transport
        hdr->sub_id = htonl( msg->sub_id );
@@ -342,7 +347,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        } else {
                if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
                        msg->state = state;
-               } 
+               }
        }
 
        // future:  if nano sends bytes, but less than mlen, then what to do?