X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fnanomsg%2Fsrc%2Fsr_static.c;h=18acdda51b9d96379d539b77c261b9deba050983;hb=a41c6f5f26b3a44009f4aff3df3f83b9a79ace01;hp=0f59da2a1f8b0175c88a4179dfd23376cc73eab4;hpb=d710957ed5d73bf2da2ceea3f5a1a3c509275c30;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nanomsg/src/sr_static.c b/src/nanomsg/src/sr_static.c index 0f59da2..18acdda 100644 --- a/src/nanomsg/src/sr_static.c +++ b/src/nanomsg/src/sr_static.c @@ -1,14 +1,14 @@ // :vi sw=4 ts=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -98,8 +98,11 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s exit( 1 ); } + memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // must ensure that header portion of tpbuf is 0 + msg->tp_buf = msg->header; hdr = (uta_mhdr_t *) msg->header; hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version + hdr->sub_id = htonl( UNSET_SUBID ); SET_HDR_LEN( hdr ); SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas @@ -107,6 +110,8 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s msg->len = 0; // length of data in the payload msg->alloc_len = mlen; // length of allocated payload + msg->sub_id = UNSET_SUBID; + msg->mtype = UNSET_MSGTYPE; msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) @@ -145,6 +150,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload @@ -195,7 +201,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed if( RMR_D1_LEN( hdr ) ) { memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary - + } if( RMR_D2_LEN( hdr ) ) { memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) ); // copy data1 and data2 if necessary @@ -205,9 +211,10 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { nm->payload = PAYLOAD_ADDR( hdr ); // reference user payload break; } - + // --- these are all version agnostic ----------------------------------- nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload @@ -249,11 +256,12 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->len = msg->state - RMR_HDR_LEN( hdr ); } msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order + msg->sub_id = ntohl( hdr->sub_id ); // capture and convert from network order to local order msg->state = RMR_OK; msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src msg->payload = PAYLOAD_ADDR( msg->header ); msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header ); } else { msg->len = 0; @@ -266,7 +274,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { /* Receives a 'raw' message from a non-RMr sender (no header expected). The returned - message buffer cannot be used to send, and the length information may or may + message buffer cannot be used to send, and the length information may or may not be correct (it is set to the length received which might be more than the bytes actually in the payload). */ @@ -284,7 +292,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length if( msg->state >= 0 ) { msg->xaction = NULL; - msg->mtype = -1; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; msg->len = msg->state; // no header; len is the entire thing received msg->state = RMR_OK; msg->flags = MFL_RAW; // prevent any sending of this headerless buffer @@ -295,7 +304,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->state = RMR_ERR_EMPTY; msg->payload = NULL; msg->xaction = NULL; - msg->mtype = -1; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; } return msg; @@ -303,8 +313,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { /* This does the hard work of actually sending the message to the given socket. On success, - a new message struct is returned. On error, the original msg is returned with the state - set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new + a new message struct is returned. On error, the original msg is returned with the state + set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new buffer will not be allocated and returned (mostly for call() interal processing since the return message from call() is a received buffer, not a new one). @@ -317,8 +327,10 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) { // future: ensure that application did not overrun the XID buffer; last byte must be 0 + //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype ); hdr = (uta_mhdr_t *) msg->header; - hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport + hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport + hdr->sub_id = htonl( msg->sub_id ); hdr->plen = htonl( msg->len ); if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source @@ -335,7 +347,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) { } else { if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) { msg->state = state; - } + } } // future: if nano sends bytes, but less than mlen, then what to do?