3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: These are static send/receive related functions.
26 Author: E. Scott Daniels
27 Date: 13 February 2019
45 #include <arpa/inet.h>
47 #include <nanomsg/nn.h>
48 #include <nanomsg/tcp.h>
49 #include <nanomsg/pair.h>
50 #include <nanomsg/pipeline.h>
51 #include <nanomsg/pubsub.h>
53 #include "rmr.h" // things the users see
54 #include "rmr_private.h" // things that we need too
55 #include "rmr_symtab.h"
57 #include "ring_static.c" // message ring support
58 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
59 #include "rtable_static.c" // route table things (nano specific)
60 #include "tools_static.c"
65 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
66 a new message struct as well. Size is the size of the zc buffer to allocate (not
67 including our header). If size is 0, then the buffer allocated is the size previously
68 allocated (if msg is !nil) or the default size given at initialisation).
71 The trlo parm is the trace length override which will be used if not 0. If 0, then the
72 length in the context is used (default).
74 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
77 int tr_len; // length to allocate for trace info
79 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
81 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
82 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
85 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
87 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for message\n" );
91 mlen = msg->alloc_len; // msg given, allocate the same size as before
94 memset( msg, 0, sizeof( *msg ) );
96 if( (msg->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
97 fprintf( stderr, "[CRIT] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", errno );
101 hdr = (uta_mhdr_t *) msg->header;
102 hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
103 hdr->sub_id = htonl( UNSET_SUBID );
105 SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used
106 //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
107 //SET_HDR_D2_LEN( hdr, ctx->d1_len );
109 msg->len = 0; // length of data in the payload
110 msg->alloc_len = mlen; // length of allocated payload
111 msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport
112 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
113 msg->state = state; // fill in caller's state (likely the state of the last operation)
114 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
115 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
117 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen = %d size=%d mpl=%d flags=%02x %p m=%p @%p\n", mlen, size, ctx->max_plen, msg->flags, &msg->flags, msg, msg->header );
123 This will clone a message into a new zero copy buffer and return the cloned message.
125 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
126 rmr_mbuf_t* nm; // new message buffer
129 if( old_msg == NULL ) {
133 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
135 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for message buffer\n" );
138 memset( nm, 0, sizeof( *nm ) );
140 mlen = old_msg->alloc_len; // length allocated before
141 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
142 fprintf( stderr, "[CRIT] rmr_clone: cannot get memory for zero copy buffer: %d\n", errno );
146 memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
148 nm->mtype = old_msg->mtype;
149 nm->sub_id = old_msg->sub_id;
150 nm->len = old_msg->len; // length of data in the payload
151 nm->alloc_len = mlen; // length of allocated payload
152 nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload
153 nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
154 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
155 nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
156 memcpy( nm->payload, old_msg->payload, old_msg->len );
161 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
162 rmr_mbuf_t* nm; // new message buffer
167 int tr_old_len; // tr size in new buffer
170 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
172 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
175 memset( nm, 0, sizeof( *nm ) );
177 hdr = old_msg->header;
178 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
180 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
181 if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
182 if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) { // this will be released on send, so DO NOT free
183 fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
187 nm->tp_buf = nm->header; // in nano both are the same
188 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
189 switch( ntohl( v1hdr->rmr_ver ) ) {
191 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
192 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
195 default: // current message always caught here
197 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
198 if( RMR_D1_LEN( hdr ) ) {
199 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
202 if( RMR_D2_LEN( hdr ) ) {
203 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) ); // copy data1 and data2 if necessary
206 SET_HDR_TR_LEN( hdr, tr_len ); // len MUST be set before pointing payload
207 nm->payload = PAYLOAD_ADDR( hdr ); // reference user payload
211 // --- these are all version agnostic -----------------------------------
212 nm->mtype = old_msg->mtype;
213 nm->sub_id = old_msg->sub_id;
214 nm->len = old_msg->len; // length of data in the payload
215 nm->alloc_len = mlen; // length of allocated payload
217 nm->xaction = hdr->xid; // reference xaction
218 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
219 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
220 memcpy( nm->payload, old_msg->payload, old_msg->len );
226 This is the receive work horse used by the outer layer receive functions.
227 It waits for a message to be received on our listen socket. If old msg
228 is passed in, the we assume we can use it instead of allocating a new
229 one, else a new block of memory is allocated.
231 This allocates a zero copy message so that if the user wishes to call
232 uta_rts_msg() the send is zero copy.
234 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
235 int nn_sock; // endpoint socket for send
237 rmr_mbuf_t* msg = NULL; // msg received
243 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
246 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // total space (header + payload len) allocated
247 if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here
248 hdr = (uta_mhdr_t *) msg->header;
249 msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size)
250 if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
251 msg->state = RMR_ERR_TRUNC;
252 msg->len = msg->state - RMR_HDR_LEN( hdr );
254 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
255 msg->sub_id = ntohl( hdr->sub_id ); // capture and convert from network order to local order
257 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
258 msg->payload = PAYLOAD_ADDR( msg->header );
259 msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id
260 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
261 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
264 msg->state = RMR_ERR_EMPTY;
272 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
273 message buffer cannot be used to send, and the length information may or may
274 not be correct (it is set to the length received which might be more than the
275 bytes actually in the payload).
277 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
278 int nn_sock; // endpoint socket for send
280 rmr_mbuf_t* msg = NULL; // msg received
285 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
288 msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length
289 if( msg->state >= 0 ) {
291 msg->mtype = UNSET_MSGTYPE;
292 msg->sub_id = UNSET_SUBID;
293 msg->len = msg->state; // no header; len is the entire thing received
295 msg->flags = MFL_RAW; // prevent any sending of this headerless buffer
296 msg->payload = msg->header;
297 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
300 msg->state = RMR_ERR_EMPTY;
303 msg->mtype = UNSET_MSGTYPE;
304 msg->sub_id = UNSET_SUBID;
311 This does the hard work of actually sending the message to the given socket. On success,
312 a new message struct is returned. On error, the original msg is returned with the state
313 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
314 buffer will not be allocated and returned (mostly for call() interal processing since
315 the return message from call() is a received buffer, not a new one).
317 Called by rmr_send_msg() and rmr_rts_msg().
319 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
322 int tr_len; // length from the message being sent (must snarf before send to use after send)
324 // future: ensure that application did not overrun the XID buffer; last byte must be 0
326 hdr = (uta_mhdr_t *) msg->header;
327 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport
328 hdr->sub_id = htonl( msg->sub_id );
329 hdr->plen = htonl( msg->len );
331 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
332 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
335 tr_len = RMR_TR_LEN( hdr );
336 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
337 if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
340 msg->header = NULL; // nano frees; don't risk accessing later by mistake
343 if( (state = nn_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, NN_DONTWAIT )) < 0 ) {
348 // future: if nano sends bytes, but less than mlen, then what to do?
349 if( msg->state >= 0 ) { // successful send
350 if( !(msg->flags & MFL_NOALLOC) ) { // if noalloc is set, then caller doesn't want a new buffer
351 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
353 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
356 } else { // send failed -- return original message
357 if( errno == EAGAIN ) {
358 msg->state = RMR_ERR_RETRY; // some wrappers can't see errno, make this obvious
360 msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
362 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %s\n", strerror( errno ) );
370 A generic wrapper to the real send to keep wormhole stuff agnostic.
371 We assume the wormhole function vetted the buffer so we don't have to.
373 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
374 return send_msg( ctx, msg, ep->nn_sock );