1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_nng_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
42 Translates the nng state passed in to one of ours that is suitable to put
43 into the message, and sets errno to something that might be useful.
44 If we don't have a specific RMr state, then we return the default (e.g.
47 static inline int xlate_nng_state( int state, int def_state ) {
55 case NNG_EAGAIN: // soft errors get retry as the RMr error
56 state = RMR_ERR_RETRY;
61 state = RMR_ERR_RETRY;
81 errno = EBADFD; // file des not in a good state for the operation
86 errno = EBADFD; // file des not in a good state for the operation
100 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
101 a new message struct as well. Size is the size of the zc buffer to allocate (not
102 including our header). If size is 0, then the buffer allocated is the size previously
103 allocated (if msg is !nil) or the default size given at initialisation).
105 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
106 are zero copy, however ONLY the message is zero copy. We now allocate and use
109 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
111 uta_mhdr_t* hdr; // convenience pointer
113 mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
114 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
117 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
119 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
123 mlen = msg->alloc_len; // msg given, allocate the same size as before
126 memset( msg, 0, sizeof( *msg ) );
128 if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
129 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
130 abort( ); // toss out a core file for this
133 msg->header = nng_msg_body( msg->tp_buf );
134 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
135 hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
137 msg->len = 0; // length of data in the payload
138 msg->alloc_len = mlen; // length of allocated payload
139 msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
140 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
141 msg->state = state; // fill in caller's state (likely the state of the last operation)
142 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
143 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
145 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
151 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
152 NNG receive must allocate that on its own.
154 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
156 uta_mhdr_t* hdr; // convenience pointer
159 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
161 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
165 memset( msg, 0, sizeof( *msg ) );
169 msg->len = -1; // no payload; invalid len
173 msg->state = RMR_ERR_UNSET;
180 This accepts a message with the assumption that only the tp_buf pointer is valid. It
181 sets all of the various header/payload/xaction pointers in the mbuf to the proper
182 spot in the transport layer buffer. The len in the header is assumed to be the
183 allocated len (a receive buffer that nng created);
185 The alen parm is the assumed allocated length; assumed because it's a value likely
186 to have come from nng receive and the actual alloc len might be larger, but we
187 can only assume this is the total usable space.
189 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
192 msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
194 hdr = (uta_mhdr_t *) msg->header;
195 hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
196 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
197 msg->alloc_len = alen; // length of whole tp buffer (including header)
198 msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
199 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
200 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
201 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
206 This will clone a message into a new zero copy buffer and return the cloned message.
208 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
209 rmr_mbuf_t* nm; // new message buffer
213 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
215 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
218 memset( nm, 0, sizeof( *nm ) );
220 mlen = old_msg->alloc_len; // length allocated before
221 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
222 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
226 nm->header = nng_msg_body( nm->tp_buf );
227 nm->mtype = old_msg->mtype;
228 nm->len = old_msg->len; // length of data in the payload
229 nm->alloc_len = mlen; // length of allocated payload
230 nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
231 nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
232 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
233 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
235 memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
236 memcpy( nm->payload, old_msg->payload, old_msg->len );
242 This is the receive work horse used by the outer layer receive functions.
243 It waits for a message to be received on our listen socket. If old msg
244 is passed in, the we assume we can use it instead of allocating a new
245 one, else a new block of memory is allocated.
247 This allocates a zero copy message so that if the user wishes to call
248 rmr_rts_msg() the send is zero copy.
250 The nng timeout on send is at the ms level which is a tad too long for
251 our needs. So, if NNG returns eagain or timedout (we don't set one)
252 we will loop up to 5 times with a 10 microsecond delay between each
253 attempt. If at the end of this set of retries NNG is still saying
254 eagain/timeout we'll return to the caller with that set in errno.
255 Right now this is only for zero-copy buffers (they should all be zc
259 In the NNG msg world it must allocate the receive buffer rather
260 than accepting one that we allocated from their space and could
261 reuse. They have their reasons I guess. Thus, we will free
262 the old transport buffer if user passes the message in; at least
263 our mbuf will be reused.
265 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
267 rmr_mbuf_t* msg = NULL; // msg received
269 size_t rsize; // nng needs to write back the size received... grrr
273 if( msg->tp_buf != NULL ) {
274 nng_msg_free( msg->tp_buf );
279 //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
280 msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
287 //rsize = msg->alloc_len; // set to max, and we'll get len back here too
288 //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS ); // total space (header + payload len) allocated
289 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
290 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
294 if( msg->tp_buf == NULL ) { // if state is good this _should_ not be nil, but parninoia says check anyway
295 msg->state = RMR_ERR_EMPTY;
299 rsize = nng_msg_len( msg->tp_buf );
300 if( rsize >= sizeof( uta_mhdr_t ) ) { // we need at least a full header here
302 ref_tpbuf( msg, rsize ); // point payload, header etc to the just received tp buffer
303 hdr = (uta_mhdr_t *) msg->header;
304 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
305 if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) { // way more than we should have had room for; error
306 msg->state = RMR_ERR_TRUNC;
309 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
310 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
313 msg->state = RMR_ERR_EMPTY;
320 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
321 message buffer cannot be used to send, and the length information may or may
322 not be correct (it is set to the length received which might be more than the
323 bytes actually in the payload).
325 Mostly this supports the route table collector, but could be extended with an
326 API external function.
328 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
330 rmr_mbuf_t* msg = NULL; // msg received
331 size_t rsize; // nng needs to write back the size received... grrr
336 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
339 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
340 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
343 rsize = nng_msg_len( msg->tp_buf );
345 // do NOT use ref_tpbuf() here! Must fill these in manually.
346 msg->header = nng_msg_body( msg->tp_buf );
347 msg->len = rsize; // len is the number of bytes received
348 msg->alloc_len = rsize;
349 msg->mtype = -1; // raw message has no type
351 msg->flags = MFL_RAW;
352 msg->payload = msg->header; // payload is the whole thing; no header
355 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
361 This does the hard work of actually sending the message to the given socket. On success,
362 a new message struct is returned. On error, the original msg is returned with the state
363 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
364 buffer will not be allocated and returned (mostly for call() interal processing since
365 the return message from call() is a received buffer, not a new one).
367 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
368 validation has been done prior.
370 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
373 int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
374 int spin_retries = 1000; // if eagain/timeout we'll spin this many times before giving up the CPU
376 // future: ensure that application did not overrun the XID buffer; last byte must be 0
378 hdr = (uta_mhdr_t *) msg->header;
379 hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport
380 hdr->plen = htonl( msg->len );
382 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
383 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
388 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
389 //nng_flags |= NNG_FLAG_ALLOC; // indicate a zc buffer that nng is expected to free
392 if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
394 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
395 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
397 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
401 state = 0; // don't loop
402 //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
407 msg->header = NULL; // nano frees; don't risk accessing later by mistake
410 } while( state && retries > 0 );
412 msg->state = RMR_ERR_SENDFAILED;
417 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
419 //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
424 if( msg->state == RMR_OK ) { // successful send
425 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
426 return alloc_zcmsg( ctx, msg, 0, RMR_OK ); // preallocate a zero-copy buffer and return msg
428 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
431 } else { // send failed -- return original message
432 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
434 msg->state = RMR_ERR_RETRY; // errno will have nano reason
436 msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
437 //errno = -msg->state;
438 //msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
441 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
448 A generic wrapper to the real send to keep wormhole stuff agnostic.
449 We assume the wormhole function vetted the buffer so we don't have to.
451 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
452 return send_msg( ctx, msg, ep->nn_sock, -1 );