1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_nng_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
42 Translates the nng state passed in to one of ours that is suitable to put
43 into the message, and sets errno to something that might be useful.
44 If we don't have a specific RMr state, then we return the default (e.g.
47 static inline int xlate_nng_state( int state, int def_state ) {
55 case NNG_EAGAIN: // soft errors get retry as the RMr error
56 state = RMR_ERR_RETRY;
61 state = RMR_ERR_RETRY;
81 errno = EBADFD; // file des not in a good state for the operation
86 errno = EBADFD; // file des not in a good state for the operation
100 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
101 a new message struct as well. Size is the size of the zc buffer to allocate (not
102 including our header). If size is 0, then the buffer allocated is the size previously
103 allocated (if msg is !nil) or the default size given at initialisation).
105 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
106 are zero copy, however ONLY the message is zero copy. We now allocate and use
109 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
110 size_t mlen; // size of the transport buffer that we'll allocate
111 uta_mhdr_t* hdr; // convenience pointer
113 mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
114 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
117 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
119 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
123 mlen = msg->alloc_len; // msg given, allocate the same size as before
126 memset( msg, 0, sizeof( *msg ) );
128 if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
129 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
130 abort( ); // toss out a core file for this
133 msg->header = nng_msg_body( msg->tp_buf );
134 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
135 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
136 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
137 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
138 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
139 //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
140 //SET_HDR_D2_LEN( hdr, ctx->d2_len );
142 msg->len = 0; // length of data in the payload
143 msg->alloc_len = mlen; // length of allocated transport buffer
144 msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits
145 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
146 msg->state = state; // fill in caller's state (likely the state of the last operation)
147 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
148 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
150 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
156 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
157 NNG receive must allocate that on its own.
159 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
161 uta_mhdr_t* hdr; // convenience pointer
164 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
166 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
170 memset( msg, 0, sizeof( *msg ) );
174 msg->len = -1; // no payload; invalid len
178 msg->state = RMR_ERR_UNSET;
185 This accepts a message with the assumption that only the tp_buf pointer is valid. It
186 sets all of the various header/payload/xaction pointers in the mbuf to the proper
187 spot in the transport layer buffer. The len in the header is assumed to be the
188 allocated len (a receive buffer that nng created);
190 The alen parm is the assumed allocated length; assumed because it's a value likely
191 to have come from nng receive and the actual alloc len might be larger, but we
192 can only assume this is the total usable space.
194 This function returns the message with an error state set if it detects that the
195 received message might have been truncated. Check is done here as the calculation
196 is somewhat based on header version.
198 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
199 uta_mhdr_t* hdr; // current header
200 uta_v1mhdr_t* v1hdr; // version 1 header
202 int hlen; // header len to use for a truncation check
204 msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
205 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
207 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
209 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
211 ver = ntohl( v1hdr->rmr_ver );
216 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
217 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
218 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
220 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
221 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
222 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
224 hlen = sizeof( uta_v1mhdr_t );
227 default: // current version always lands here
228 hdr = (uta_mhdr_t *) msg->header;
229 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
230 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
232 msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits
233 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
234 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
235 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
236 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
240 if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error
241 msg->state = RMR_ERR_TRUNC;
242 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
249 This will clone a message into a new zero copy buffer and return the cloned message.
251 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
252 rmr_mbuf_t* nm; // new message buffer
258 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
260 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
263 memset( nm, 0, sizeof( *nm ) );
265 mlen = old_msg->alloc_len; // length allocated before
266 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
267 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
271 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
272 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
273 switch( ntohl( v1hdr->rmr_ver ) ) {
275 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
276 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
279 default: // current message always caught here
281 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
282 nm->payload = nm->header + PAYLOAD_OFFSET( hdr ); // point at the payload
286 // --- these are all version agnostic -----------------------------------
287 nm->mtype = old_msg->mtype;
288 nm->len = old_msg->len; // length of data in the payload
289 nm->alloc_len = mlen; // length of allocated payload
291 nm->xaction = hdr->xid; // reference xaction
292 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
293 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
294 memcpy( nm->payload, old_msg->payload, old_msg->len );
300 This is the receive work horse used by the outer layer receive functions.
301 It waits for a message to be received on our listen socket. If old msg
302 is passed in, the we assume we can use it instead of allocating a new
303 one, else a new block of memory is allocated.
305 This allocates a zero copy message so that if the user wishes to call
306 rmr_rts_msg() the send is zero copy.
308 The nng timeout on send is at the ms level which is a tad too long for
309 our needs. So, if NNG returns eagain or timedout (we don't set one)
310 we will loop up to 5 times with a 10 microsecond delay between each
311 attempt. If at the end of this set of retries NNG is still saying
312 eagain/timeout we'll return to the caller with that set in errno.
313 Right now this is only for zero-copy buffers (they should all be zc
317 In the NNG msg world it must allocate the receive buffer rather
318 than accepting one that we allocated from their space and could
319 reuse. They have their reasons I guess. Thus, we will free
320 the old transport buffer if user passes the message in; at least
321 our mbuf will be reused.
323 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
325 rmr_mbuf_t* msg = NULL; // msg received
327 size_t rsize; // nng needs to write back the size received... grrr
331 if( msg->tp_buf != NULL ) {
332 nng_msg_free( msg->tp_buf );
337 msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
345 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
346 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
350 if( msg->tp_buf == NULL ) { // if state is good this _should_ not be nil, but parninoia says check anyway
351 msg->state = RMR_ERR_EMPTY;
355 rsize = nng_msg_len( msg->tp_buf );
356 if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
357 ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
358 hdr = (uta_mhdr_t *) msg->header;
359 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
362 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
363 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
365 msg->state = RMR_ERR_EMPTY;
367 msg->alloc_len = rsize;
370 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
378 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
379 message buffer cannot be used to send, and the length information may or may
380 not be correct (it is set to the length received which might be more than the
381 bytes actually in the payload).
383 Mostly this supports the route table collector, but could be extended with an
384 API external function.
386 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
388 rmr_mbuf_t* msg = NULL; // msg received
389 size_t rsize; // nng needs to write back the size received... grrr
394 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
397 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
398 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
401 rsize = nng_msg_len( msg->tp_buf );
403 // do NOT use ref_tpbuf() here! Must fill these in manually.
404 msg->header = nng_msg_body( msg->tp_buf );
405 msg->len = rsize; // len is the number of bytes received
406 msg->alloc_len = rsize;
407 msg->mtype = -1; // raw message has no type
409 msg->flags = MFL_RAW;
410 msg->payload = msg->header; // payload is the whole thing; no header
413 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
419 This does the hard work of actually sending the message to the given socket. On success,
420 a new message struct is returned. On error, the original msg is returned with the state
421 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
422 buffer will not be allocated and returned (mostly for call() interal processing since
423 the return message from call() is a received buffer, not a new one).
425 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
426 validation has been done prior.
428 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
431 int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
432 int spin_retries = 1000; // if eagain/timeout we'll spin this many times before giving up the CPU
434 // future: ensure that application did not overrun the XID buffer; last byte must be 0
436 hdr = (uta_mhdr_t *) msg->header;
437 hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport
438 hdr->plen = htonl( msg->len );
440 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
441 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
446 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
448 if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
450 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
451 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
453 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
457 state = 0; // don't loop
458 //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
463 msg->header = NULL; // nano frees; don't risk accessing later by mistake
466 } while( state && retries > 0 );
468 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
469 msg->state = RMR_ERR_SENDFAILED;
474 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
476 //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
481 if( msg->state == RMR_OK ) { // successful send
482 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
483 return alloc_zcmsg( ctx, msg, 0, RMR_OK ); // preallocate a zero-copy buffer and return msg
485 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
488 } else { // send failed -- return original message
489 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
491 msg->state = RMR_ERR_RETRY; // errno will have nano reason
493 msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
496 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
503 A generic wrapper to the real send to keep wormhole stuff agnostic.
504 We assume the wormhole function vetted the buffer so we don't have to.
506 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
507 return send_msg( ctx, msg, ep->nn_sock, -1 );