X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnanomsg%2Fsrc%2Fsr_static.c;fp=src%2Frmr%2Fnanomsg%2Fsrc%2Fsr_static.c;h=18acdda51b9d96379d539b77c261b9deba050983;hb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;hp=0000000000000000000000000000000000000000;hpb=f7d44570f8de6e15f768e8e2d9b6061cd0bff11f;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nanomsg/src/sr_static.c b/src/rmr/nanomsg/src/sr_static.c new file mode 100644 index 0000000..18acdda --- /dev/null +++ b/src/rmr/nanomsg/src/sr_static.c @@ -0,0 +1,382 @@ +// :vi sw=4 ts=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: sr_static.c + Abstract: These are static send/receive related functions. + + (broken out of rmr.c) + Author: E. Scott Daniels + Date: 13 February 2019 +*/ + +#ifndef _sr_static_c +#define _sr_static_c + +/* +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "rmr.h" // things the users see +#include "rmr_private.h" // things that we need too +#include "rmr_symtab.h" + +#include "ring_static.c" // message ring support +#include "rt_generic_static.c" // generic route table (not nng/nano specific) +#include "rtable_static.c" // route table things (nano specific) +#include "tools_static.c" +*/ + + +/* + Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc + a new message struct as well. Size is the size of the zc buffer to allocate (not + including our header). If size is 0, then the buffer allocated is the size previously + allocated (if msg is !nil) or the default size given at initialisation). + + + The trlo parm is the trace length override which will be used if not 0. If 0, then the + length in the context is used (default). +*/ +static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) { + int mlen; + uta_mhdr_t* hdr; + int tr_len; // length to allocate for trace info + + tr_len = trlo > 0 ? trlo : ctx->trace_data_len; + + mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths + mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init + + if( msg == NULL ) { + msg = (rmr_mbuf_t *) malloc( sizeof *msg ); + if( msg == NULL ) { + fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" ); + exit( 1 ); + } + } else { + mlen = msg->alloc_len; // msg given, allocate the same size as before + } + + memset( msg, 0, sizeof( *msg ) ); + + if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free + fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno ); + exit( 1 ); + } + + memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // must ensure that header portion of tpbuf is 0 + msg->tp_buf = msg->header; + hdr = (uta_mhdr_t *) msg->header; + hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version + hdr->sub_id = htonl( UNSET_SUBID ); + SET_HDR_LEN( hdr ); + SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used + //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas + //SET_HDR_D2_LEN( hdr, ctx->d1_len ); + + msg->len = 0; // length of data in the payload + msg->alloc_len = mlen; // length of allocated payload + msg->sub_id = UNSET_SUBID; + msg->mtype = UNSET_MSGTYPE; + msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport + msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area + msg->state = state; // fill in caller's state (likely the state of the last operation) + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); + + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header ); + + return msg; +} + +/* + This will clone a message into a new zero copy buffer and return the cloned message. +*/ +static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { + rmr_mbuf_t* nm; // new message buffer + int mlen; + + if( old_msg == NULL ) { + return NULL; + } + + nm = (rmr_mbuf_t *) malloc( sizeof *nm ); + if( nm == NULL ) { + fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" ); + exit( 1 ); + } + memset( nm, 0, sizeof( *nm ) ); + + mlen = old_msg->alloc_len; // length allocated before + if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free + fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno ); + exit( 1 ); + } + + memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data + + nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; + nm->len = old_msg->len; // length of data in the payload + nm->alloc_len = mlen; // length of allocated payload + nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload + nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area + nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) + nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + memcpy( nm->payload, old_msg->payload, old_msg->len ); + + return nm; +} + +static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) { + rmr_mbuf_t* nm; // new message buffer + size_t mlen; + int state; + uta_mhdr_t* hdr; + uta_v1mhdr_t* v1hdr; + int tr_old_len; // tr size in new buffer + + + nm = (rmr_mbuf_t *) malloc( sizeof *nm ); + if( nm == NULL ) { + fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" ); + exit( 1 ); + } + memset( nm, 0, sizeof( *nm ) ); + + hdr = old_msg->header; + tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace + + mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment + if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len ); + if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free + fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno ); + exit( 1 ); + } + + nm->tp_buf = nm->header; // in nano both are the same + v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version + switch( ntohl( v1hdr->rmr_ver ) ) { + case 1: + memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header + nm->payload = (void *) v1hdr + sizeof( *v1hdr ); + break; + + default: // current message always caught here + hdr = nm->header; + memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed + if( RMR_D1_LEN( hdr ) ) { + memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary + + } + if( RMR_D2_LEN( hdr ) ) { + memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) ); // copy data1 and data2 if necessary + } + + SET_HDR_TR_LEN( hdr, tr_len ); // len MUST be set before pointing payload + nm->payload = PAYLOAD_ADDR( hdr ); // reference user payload + break; + } + + // --- these are all version agnostic ----------------------------------- + nm->mtype = old_msg->mtype; + nm->sub_id = old_msg->sub_id; + nm->len = old_msg->len; // length of data in the payload + nm->alloc_len = mlen; // length of allocated payload + + nm->xaction = hdr->xid; // reference xaction + nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) + nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message + memcpy( nm->payload, old_msg->payload, old_msg->len ); + + return nm; +} + +/* + This is the receive work horse used by the outer layer receive functions. + It waits for a message to be received on our listen socket. If old msg + is passed in, the we assume we can use it instead of allocating a new + one, else a new block of memory is allocated. + + This allocates a zero copy message so that if the user wishes to call + uta_rts_msg() the send is zero copy. +*/ +static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { + int nn_sock; // endpoint socket for send + int state; + rmr_mbuf_t* msg = NULL; // msg received + uta_mhdr_t* hdr; + + if( old_msg ) { + msg = old_msg; + } else { + msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check + } + + msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // total space (header + payload len) allocated + if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here + hdr = (uta_mhdr_t *) msg->header; + msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size) + if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) { + msg->state = RMR_ERR_TRUNC; + msg->len = msg->state - RMR_HDR_LEN( hdr ); + } + msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order + msg->sub_id = ntohl( hdr->sub_id ); // capture and convert from network order to local order + msg->state = RMR_OK; + msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src + msg->payload = PAYLOAD_ADDR( msg->header ); + msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", + msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header ); + } else { + msg->len = 0; + msg->state = RMR_ERR_EMPTY; + } + + return msg; +} + + +/* + Receives a 'raw' message from a non-RMr sender (no header expected). The returned + message buffer cannot be used to send, and the length information may or may + not be correct (it is set to the length received which might be more than the + bytes actually in the payload). +*/ +static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { + int nn_sock; // endpoint socket for send + int state; + rmr_mbuf_t* msg = NULL; // msg received + + if( old_msg ) { + msg = old_msg; + } else { + msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check + } + + msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length + if( msg->state >= 0 ) { + msg->xaction = NULL; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; + msg->len = msg->state; // no header; len is the entire thing received + msg->state = RMR_OK; + msg->flags = MFL_RAW; // prevent any sending of this headerless buffer + msg->payload = msg->header; + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len ); + } else { + msg->len = 0; + msg->state = RMR_ERR_EMPTY; + msg->payload = NULL; + msg->xaction = NULL; + msg->mtype = UNSET_MSGTYPE; + msg->sub_id = UNSET_SUBID; + } + + return msg; +} + +/* + This does the hard work of actually sending the message to the given socket. On success, + a new message struct is returned. On error, the original msg is returned with the state + set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new + buffer will not be allocated and returned (mostly for call() interal processing since + the return message from call() is a received buffer, not a new one). + + Called by rmr_send_msg() and rmr_rts_msg(). +*/ +static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) { + int state; + uta_mhdr_t* hdr; + int tr_len; // length from the message being sent (must snarf before send to use after send) + + // future: ensure that application did not overrun the XID buffer; last byte must be 0 + + //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype ); + hdr = (uta_mhdr_t *) msg->header; + hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport + hdr->sub_id = htonl( msg->sub_id ); + hdr->plen = htonl( msg->len ); + + if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source + strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours + } + + tr_len = RMR_TR_LEN( hdr ); + if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer + if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) { + msg->state = state; + } else { + msg->header = NULL; // nano frees; don't risk accessing later by mistake + } + } else { + if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) { + msg->state = state; + } + } + + // future: if nano sends bytes, but less than mlen, then what to do? + if( msg->state >= 0 ) { // successful send + if( !(msg->flags & MFL_NOALLOC) ) { // if noalloc is set, then caller doesn't want a new buffer + return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer) + } else { + rmr_free_msg( msg ); // not wanting a meessage back, trash this one + return NULL; + } + } else { // send failed -- return original message + if( errno == EAGAIN ) { + msg->state = RMR_ERR_RETRY; // some wrappers can't see errno, make this obvious + } else { + msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason + } + if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) ); + } + + return msg; +} + + +/* + A generic wrapper to the real send to keep wormhole stuff agnostic. + We assume the wormhole function vetted the buffer so we don't have to. +*/ +static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) { + return send_msg( ctx, msg, ep->nn_sock ); +} + +#endif