3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: These are static send/receive related functions.
26 Author: E. Scott Daniels
27 Date: 13 February 2019
45 #include <arpa/inet.h>
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
53 #include "rmr.h" // things the users see
54 #include "rmr_private.h" // things that we need too
55 #include "rmr_symtab.h"
57 #include "ring_static.c" // message ring support
58 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
59 #include "rtable_static.c" // route table things (nano specific)
60 #include "tools_static.c"
65 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66 a new message struct as well. Size is the size of the zc buffer to allocate (not
67 including our header). If size is 0, then the buffer allocated is the size previously
68 allocated (if msg is !nil) or the default size given at initialisation).
70 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
74 mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
75 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
78 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
80 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
84 mlen = msg->alloc_len; // msg given, allocate the same size as before
87 memset( msg, 0, sizeof( *msg ) );
89 if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
90 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
94 hdr = (uta_mhdr_t *) msg->header;
95 hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
97 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
98 //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
99 //SET_HDR_D2_LEN( hdr, ctx->d1_len );
101 msg->len = 0; // length of data in the payload
102 msg->alloc_len = mlen; // length of allocated payload
103 msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // point at the payload in transport
104 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
105 msg->state = state; // fill in caller's state (likely the state of the last operation)
106 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
107 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
109 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
115 This will clone a message into a new zero copy buffer and return the cloned message.
117 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
118 rmr_mbuf_t* nm; // new message buffer
121 if( old_msg == NULL ) {
125 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
127 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
130 memset( nm, 0, sizeof( *nm ) );
132 mlen = old_msg->alloc_len; // length allocated before
133 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
134 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
138 memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
140 nm->mtype = old_msg->mtype;
141 nm->len = old_msg->len; // length of data in the payload
142 nm->alloc_len = mlen; // length of allocated payload
143 nm->payload = nm->header + PAYLOAD_OFFSET( nm->header );
144 nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
145 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
146 nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
147 memcpy( nm->payload, old_msg->payload, old_msg->len );
153 This is the receive work horse used by the outer layer receive functions.
154 It waits for a message to be received on our listen socket. If old msg
155 is passed in, the we assume we can use it instead of allocating a new
156 one, else a new block of memory is allocated.
158 This allocates a zero copy message so that if the user wishes to call
159 uta_rts_msg() the send is zero copy.
161 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
162 int nn_sock; // endpoint socket for send
164 rmr_mbuf_t* msg = NULL; // msg received
170 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
173 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // total space (header + payload len) allocated
174 if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here
175 hdr = (uta_mhdr_t *) msg->header;
176 msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size)
177 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
178 msg->state = RMR_ERR_TRUNC;
179 msg->len = msg->state - RMR_HDR_LEN( hdr );
181 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
183 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
184 msg->payload = msg->header + PAYLOAD_OFFSET( msg->header );
185 msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id
186 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
187 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
190 msg->state = RMR_ERR_EMPTY;
198 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
199 message buffer cannot be used to send, and the length information may or may
200 not be correct (it is set to the length received which might be more than the
201 bytes actually in the payload).
203 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
204 int nn_sock; // endpoint socket for send
206 rmr_mbuf_t* msg = NULL; // msg received
211 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
214 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length
215 if( msg->state >= 0 ) {
218 msg->len = msg->state; // no header; len is the entire thing received
220 msg->flags = MFL_RAW; // prevent any sending of this headerless buffer
221 msg->payload = msg->header;
222 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
225 msg->state = RMR_ERR_EMPTY;
235 This does the hard work of actually sending the message to the given socket. On success,
236 a new message struct is returned. On error, the original msg is returned with the state
237 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
238 buffer will not be allocated and returned (mostly for call() interal processing since
239 the return message from call() is a received buffer, not a new one).
241 Called by rmr_send_msg() and rmr_rts_msg().
243 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
247 // future: ensure that application did not overrun the XID buffer; last byte must be 0
249 hdr = (uta_mhdr_t *) msg->header;
250 hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport
251 hdr->plen = htonl( msg->len );
253 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
254 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
257 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
258 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
261 msg->header = NULL; // nano frees; don't risk accessing later by mistake
264 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
269 // future: if nano sends bytes, but less than mlen, then what to do?
270 if( msg->state >= 0 ) { // successful send
271 if( !(msg->flags & MFL_NOALLOC) ) { // if noalloc is set, then caller doesn't want a new buffer
272 return alloc_zcmsg( ctx, msg, 0, RMR_OK ); // preallocate a zero-copy buffer and return msg
274 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
277 } else { // send failed -- return original message
278 if( errno == EAGAIN ) {
279 msg->state = RMR_ERR_RETRY; // some wrappers can't see errno, make this obvious
281 msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
283 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
291 A generic wrapper to the real send to keep wormhole stuff agnostic.
292 We assume the wormhole function vetted the buffer so we don't have to.
294 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
295 return send_msg( ctx, msg, ep->nn_sock );