3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: These are static send/receive related functions.
26 Author: E. Scott Daniels
27 Date: 13 February 2019
45 #include <arpa/inet.h>
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
53 #include "rmr.h" // things the users see
54 #include "rmr_private.h" // things that we need too
55 #include "rmr_symtab.h"
57 #include "ring_static.c" // message ring support
58 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
59 #include "rtable_static.c" // route table things (nano specific)
60 #include "tools_static.c"
65 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66 a new message struct as well. Size is the size of the zc buffer to allocate (not
67 including our header). If size is 0, then the buffer allocated is the size previously
68 allocated (if msg is !nil) or the default size given at initialisation).
71 The trlo parm is the trace length override which will be used if not 0. If 0, then the
72 length in the context is used (default).
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
77 int tr_len; // length to allocate for trace info
79 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
81 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
82 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
85 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
87 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
91 mlen = msg->alloc_len; // msg given, allocate the same size as before
94 memset( msg, 0, sizeof( *msg ) );
96 if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
97 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
101 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // must ensure that header portion of tpbuf is 0
102 msg->tp_buf = msg->header;
103 hdr = (uta_mhdr_t *) msg->header;
104 hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
105 hdr->sub_id = htonl( UNSET_SUBID );
107 SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used
108 //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
109 //SET_HDR_D2_LEN( hdr, ctx->d1_len );
111 msg->len = 0; // length of data in the payload
112 msg->alloc_len = mlen; // length of allocated payload
113 msg->sub_id = UNSET_SUBID;
114 msg->mtype = UNSET_MSGTYPE;
115 msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport
116 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
117 msg->state = state; // fill in caller's state (likely the state of the last operation)
118 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
119 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
120 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
122 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
128 This will clone a message into a new zero copy buffer and return the cloned message.
130 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
131 rmr_mbuf_t* nm; // new message buffer
134 if( old_msg == NULL ) {
138 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
140 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
143 memset( nm, 0, sizeof( *nm ) );
145 mlen = old_msg->alloc_len; // length allocated before
146 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
147 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
151 memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
153 nm->mtype = old_msg->mtype;
154 nm->sub_id = old_msg->sub_id;
155 nm->len = old_msg->len; // length of data in the payload
156 nm->alloc_len = mlen; // length of allocated payload
157 nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload
158 nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
159 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
160 nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
161 memcpy( nm->payload, old_msg->payload, old_msg->len );
166 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
167 rmr_mbuf_t* nm; // new message buffer
172 int tr_old_len; // tr size in new buffer
175 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
177 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
180 memset( nm, 0, sizeof( *nm ) );
182 hdr = old_msg->header;
183 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
185 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
186 if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
187 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
188 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
192 nm->tp_buf = nm->header; // in nano both are the same
193 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
194 switch( ntohl( v1hdr->rmr_ver ) ) {
196 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
197 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
200 default: // current message always caught here
202 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
203 if( RMR_D1_LEN( hdr ) ) {
204 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
207 if( RMR_D2_LEN( hdr ) ) {
208 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) ); // copy data1 and data2 if necessary
211 SET_HDR_TR_LEN( hdr, tr_len ); // len MUST be set before pointing payload
212 nm->payload = PAYLOAD_ADDR( hdr ); // reference user payload
216 // --- these are all version agnostic -----------------------------------
217 nm->mtype = old_msg->mtype;
218 nm->sub_id = old_msg->sub_id;
219 nm->len = old_msg->len; // length of data in the payload
220 nm->alloc_len = mlen; // length of allocated payload
222 nm->xaction = hdr->xid; // reference xaction
223 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
224 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
225 memcpy( nm->payload, old_msg->payload, old_msg->len );
231 This is the receive work horse used by the outer layer receive functions.
232 It waits for a message to be received on our listen socket. If old msg
233 is passed in, the we assume we can use it instead of allocating a new
234 one, else a new block of memory is allocated.
236 This allocates a zero copy message so that if the user wishes to call
237 uta_rts_msg() the send is zero copy.
239 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
240 int nn_sock; // endpoint socket for send
242 rmr_mbuf_t* msg = NULL; // msg received
248 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
251 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // total space (header + payload len) allocated
252 if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here
253 hdr = (uta_mhdr_t *) msg->header;
254 msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size)
255 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
256 msg->state = RMR_ERR_TRUNC;
257 msg->len = msg->state - RMR_HDR_LEN( hdr );
259 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
260 msg->sub_id = ntohl( hdr->sub_id ); // capture and convert from network order to local order
262 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
263 msg->payload = PAYLOAD_ADDR( msg->header );
264 msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id
265 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
266 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
269 msg->state = RMR_ERR_EMPTY;
277 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
278 message buffer cannot be used to send, and the length information may or may
279 not be correct (it is set to the length received which might be more than the
280 bytes actually in the payload).
282 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
283 int nn_sock; // endpoint socket for send
285 rmr_mbuf_t* msg = NULL; // msg received
290 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
293 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length
294 if( msg->state >= 0 ) {
296 msg->mtype = UNSET_MSGTYPE;
297 msg->sub_id = UNSET_SUBID;
298 msg->len = msg->state; // no header; len is the entire thing received
300 msg->flags = MFL_RAW; // prevent any sending of this headerless buffer
301 msg->payload = msg->header;
302 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
305 msg->state = RMR_ERR_EMPTY;
308 msg->mtype = UNSET_MSGTYPE;
309 msg->sub_id = UNSET_SUBID;
316 This does the hard work of actually sending the message to the given socket. On success,
317 a new message struct is returned. On error, the original msg is returned with the state
318 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
319 buffer will not be allocated and returned (mostly for call() interal processing since
320 the return message from call() is a received buffer, not a new one).
322 Called by rmr_send_msg() and rmr_rts_msg().
324 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
327 int tr_len; // length from the message being sent (must snarf before send to use after send)
329 // future: ensure that application did not overrun the XID buffer; last byte must be 0
331 //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
332 hdr = (uta_mhdr_t *) msg->header;
333 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport
334 hdr->sub_id = htonl( msg->sub_id );
335 hdr->plen = htonl( msg->len );
337 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
338 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
339 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
342 tr_len = RMR_TR_LEN( hdr );
343 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
344 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
347 msg->header = NULL; // nano frees; don't risk accessing later by mistake
350 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
355 // future: if nano sends bytes, but less than mlen, then what to do?
356 if( msg->state >= 0 ) { // successful send
357 if( !(msg->flags & MFL_NOALLOC) ) { // if noalloc is set, then caller doesn't want a new buffer
358 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
360 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
363 } else { // send failed -- return original message
364 if( errno == EAGAIN ) {
365 msg->state = RMR_ERR_RETRY; // some wrappers can't see errno, make this obvious
367 msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
369 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
377 A generic wrapper to the real send to keep wormhole stuff agnostic.
378 We assume the wormhole function vetted the buffer so we don't have to.
380 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
381 return send_msg( ctx, msg, ep->nn_sock );