3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: These are static send/receive related functions.
26 Author: E. Scott Daniels
27 Date: 13 February 2019
45 #include <arpa/inet.h>
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
53 #include "rmr.h" // things the users see
54 #include "rmr_private.h" // things that we need too
55 #include "rmr_symtab.h"
57 #include "ring_static.c" // message ring support
58 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
59 #include "rtable_static.c" // route table things (nano specific)
60 #include "tools_static.c"
65 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66 a new message struct as well. Size is the size of the zc buffer to allocate (not
67 including our header). If size is 0, then the buffer allocated is the size previously
68 allocated (if msg is !nil) or the default size given at initialisation).
71 The trlo parm is the trace length override which will be used if not 0. If 0, then the
72 length in the context is used (default).
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
77 int tr_len; // length to allocate for trace info
79 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
81 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
82 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
85 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
87 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
91 mlen = msg->alloc_len; // msg given, allocate the same size as before
94 memset( msg, 0, sizeof( *msg ) );
96 if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
97 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
101 hdr = (uta_mhdr_t *) msg->header;
102 hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
104 SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used
105 //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
106 //SET_HDR_D2_LEN( hdr, ctx->d1_len );
108 msg->len = 0; // length of data in the payload
109 msg->alloc_len = mlen; // length of allocated payload
110 msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport
111 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
112 msg->state = state; // fill in caller's state (likely the state of the last operation)
113 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
114 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
116 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
122 This will clone a message into a new zero copy buffer and return the cloned message.
124 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
125 rmr_mbuf_t* nm; // new message buffer
128 if( old_msg == NULL ) {
132 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
134 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
137 memset( nm, 0, sizeof( *nm ) );
139 mlen = old_msg->alloc_len; // length allocated before
140 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
141 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
145 memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
147 nm->mtype = old_msg->mtype;
148 nm->len = old_msg->len; // length of data in the payload
149 nm->alloc_len = mlen; // length of allocated payload
150 nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload
151 nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
152 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
153 nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
154 memcpy( nm->payload, old_msg->payload, old_msg->len );
159 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
160 rmr_mbuf_t* nm; // new message buffer
165 int tr_old_len; // tr size in new buffer
168 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
170 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
173 memset( nm, 0, sizeof( *nm ) );
175 hdr = old_msg->header;
176 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
178 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
179 if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
180 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
181 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
185 nm->tp_buf = nm->header; // in nano both are the same
186 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
187 switch( ntohl( v1hdr->rmr_ver ) ) {
189 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
190 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
193 default: // current message always caught here
195 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
196 if( RMR_D1_LEN( hdr ) ) {
197 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
200 if( RMR_D2_LEN( hdr ) ) {
201 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) ); // copy data1 and data2 if necessary
204 SET_HDR_TR_LEN( hdr, tr_len ); // len MUST be set before pointing payload
205 nm->payload = PAYLOAD_ADDR( hdr ); // reference user payload
209 // --- these are all version agnostic -----------------------------------
210 nm->mtype = old_msg->mtype;
211 nm->len = old_msg->len; // length of data in the payload
212 nm->alloc_len = mlen; // length of allocated payload
214 nm->xaction = hdr->xid; // reference xaction
215 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
216 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
217 memcpy( nm->payload, old_msg->payload, old_msg->len );
223 This is the receive work horse used by the outer layer receive functions.
224 It waits for a message to be received on our listen socket. If old msg
225 is passed in, the we assume we can use it instead of allocating a new
226 one, else a new block of memory is allocated.
228 This allocates a zero copy message so that if the user wishes to call
229 uta_rts_msg() the send is zero copy.
231 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
232 int nn_sock; // endpoint socket for send
234 rmr_mbuf_t* msg = NULL; // msg received
240 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
243 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // total space (header + payload len) allocated
244 if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here
245 hdr = (uta_mhdr_t *) msg->header;
246 msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size)
247 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
248 msg->state = RMR_ERR_TRUNC;
249 msg->len = msg->state - RMR_HDR_LEN( hdr );
251 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
253 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
254 msg->payload = PAYLOAD_ADDR( msg->header );
255 msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id
256 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
257 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
260 msg->state = RMR_ERR_EMPTY;
268 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
269 message buffer cannot be used to send, and the length information may or may
270 not be correct (it is set to the length received which might be more than the
271 bytes actually in the payload).
273 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
274 int nn_sock; // endpoint socket for send
276 rmr_mbuf_t* msg = NULL; // msg received
281 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
284 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length
285 if( msg->state >= 0 ) {
288 msg->len = msg->state; // no header; len is the entire thing received
290 msg->flags = MFL_RAW; // prevent any sending of this headerless buffer
291 msg->payload = msg->header;
292 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
295 msg->state = RMR_ERR_EMPTY;
305 This does the hard work of actually sending the message to the given socket. On success,
306 a new message struct is returned. On error, the original msg is returned with the state
307 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
308 buffer will not be allocated and returned (mostly for call() interal processing since
309 the return message from call() is a received buffer, not a new one).
311 Called by rmr_send_msg() and rmr_rts_msg().
313 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
316 int tr_len; // length from the message being sent (must snarf before send to use after send)
318 // future: ensure that application did not overrun the XID buffer; last byte must be 0
320 hdr = (uta_mhdr_t *) msg->header;
321 hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport
322 hdr->plen = htonl( msg->len );
324 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
325 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
328 tr_len = RMR_TR_LEN( hdr );
329 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
330 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
333 msg->header = NULL; // nano frees; don't risk accessing later by mistake
336 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
341 // future: if nano sends bytes, but less than mlen, then what to do?
342 if( msg->state >= 0 ) { // successful send
343 if( !(msg->flags & MFL_NOALLOC) ) { // if noalloc is set, then caller doesn't want a new buffer
344 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
346 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
349 } else { // send failed -- return original message
350 if( errno == EAGAIN ) {
351 msg->state = RMR_ERR_RETRY; // some wrappers can't see errno, make this obvious
353 msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
355 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
363 A generic wrapper to the real send to keep wormhole stuff agnostic.
364 We assume the wormhole function vetted the buffer so we don't have to.
366 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
367 return send_msg( ctx, msg, ep->nn_sock );