1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_nng_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 Translates the nng state passed in to one of ours that is suitable to put
42 into the message, and sets errno to something that might be useful.
43 If we don't have a specific RMr state, then we return the default (e.g.
46 static inline int xlate_nng_state( int state, int def_state ) {
54 case NNG_EAGAIN: // soft errors get retry as the RMr error
55 state = RMR_ERR_RETRY;
60 state = RMR_ERR_RETRY;
80 errno = EBADFD; // file des not in a good state for the operation
85 errno = EBADFD; // file des not in a good state for the operation
99 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100 a new message struct as well. Size is the size of the zc buffer to allocate (not
101 including our header). If size is 0, then the buffer allocated is the size previously
102 allocated (if msg is !nil) or the default size given at initialisation).
104 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105 the context value is used.
107 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
108 are zero copy, however ONLY the message is zero copy. We now allocate and use
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112 size_t mlen; // size of the transport buffer that we'll allocate
113 uta_mhdr_t* hdr; // convenience pointer
114 int tr_len; // trace data len (default or override)
116 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
118 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
119 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
122 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
124 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
128 mlen = msg->alloc_len; // msg given, allocate the same size as before
131 memset( msg, 0, sizeof( *msg ) );
133 if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135 abort( ); // toss out a core file for this
138 msg->header = nng_msg_body( msg->tp_buf );
139 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
140 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
142 hdr->sub_id = htonl( UNSET_SUBID );
143 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
144 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
145 //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
146 //SET_HDR_D2_LEN( hdr, ctx->d2_len );
148 msg->len = 0; // length of data in the payload
149 msg->alloc_len = mlen; // length of allocated transport buffer
150 msg->sub_id = UNSET_SUBID;
151 msg->mtype = UNSET_MSGTYPE;
152 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
153 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
154 msg->state = state; // fill in caller's state (likely the state of the last operation)
155 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
156 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
158 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
164 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
165 NNG receive must allocate that on its own.
167 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
169 uta_mhdr_t* hdr; // convenience pointer
172 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
174 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
178 memset( msg, 0, sizeof( *msg ) );
180 msg->sub_id = UNSET_SUBID;
181 msg->mtype = UNSET_MSGTYPE;
184 msg->len = -1; // no payload; invalid len
188 msg->state = RMR_ERR_UNSET;
195 This accepts a message with the assumption that only the tp_buf pointer is valid. It
196 sets all of the various header/payload/xaction pointers in the mbuf to the proper
197 spot in the transport layer buffer. The len in the header is assumed to be the
198 allocated len (a receive buffer that nng created);
200 The alen parm is the assumed allocated length; assumed because it's a value likely
201 to have come from nng receive and the actual alloc len might be larger, but we
202 can only assume this is the total usable space.
204 This function returns the message with an error state set if it detects that the
205 received message might have been truncated. Check is done here as the calculation
206 is somewhat based on header version.
208 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
209 uta_mhdr_t* hdr; // current header
210 uta_v1mhdr_t* v1hdr; // version 1 header
212 int hlen; // header len to use for a truncation check
214 msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
215 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
217 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
219 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
221 ver = ntohl( v1hdr->rmr_ver );
226 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
227 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
228 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
230 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
231 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
232 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
233 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
235 hlen = sizeof( uta_v1mhdr_t );
238 default: // current version always lands here
239 hdr = (uta_mhdr_t *) msg->header;
240 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
241 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
243 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
244 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
245 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
246 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
247 msg->sub_id = ntohl( hdr->sub_id );
248 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
252 if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error
253 msg->state = RMR_ERR_TRUNC;
254 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
261 This will clone a message into a new zero copy buffer and return the cloned message.
263 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
264 rmr_mbuf_t* nm; // new message buffer
270 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
272 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
275 memset( nm, 0, sizeof( *nm ) );
277 mlen = old_msg->alloc_len; // length allocated before
278 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
279 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
283 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
284 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
285 switch( ntohl( v1hdr->rmr_ver ) ) {
287 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
288 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
291 default: // current message always caught here
293 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
294 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
298 // --- these are all version agnostic -----------------------------------
299 nm->mtype = old_msg->mtype;
300 nm->sub_id = old_msg->sub_id;
301 nm->len = old_msg->len; // length of data in the payload
302 nm->alloc_len = mlen; // length of allocated payload
304 nm->xaction = hdr->xid; // reference xaction
305 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
306 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
307 memcpy( nm->payload, old_msg->payload, old_msg->len );
313 This will clone a message with a change to the trace area in the header such that
314 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
315 The orignal message will be left unchanged, and a pointer to the new message is returned.
316 It is not possible to realloc buffers and change the data sizes.
318 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
319 rmr_mbuf_t* nm; // new message buffer
324 int tr_old_len; // tr size in new buffer
325 int coffset; // an offset to something in the header for copy
327 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
329 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
332 memset( nm, 0, sizeof( *nm ) );
334 hdr = old_msg->header;
335 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
337 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
338 if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
339 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
340 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
344 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
345 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
346 switch( ntohl( v1hdr->rmr_ver ) ) {
348 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
349 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
352 default: // current message always caught here
354 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data might have changed
355 if( RMR_D1_LEN( hdr ) ) {
356 coffset = DATA1_OFFSET( hdr ); // offset to d1
357 memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
360 if( RMR_D2_LEN( hdr ) ) {
361 coffset = DATA2_OFFSET( hdr ); // offset to d2
362 memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) ); // copy data2 and data2 if necessary
365 SET_HDR_TR_LEN( hdr, tr_len ); // MUST set before pointing payload
366 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
367 SET_HDR_TR_LEN( hdr, tr_len ); // do NOT copy old trace data, just set the new header
371 // --- these are all version agnostic -----------------------------------
372 nm->mtype = old_msg->mtype;
373 nm->sub_id = old_msg->sub_id;
374 nm->len = old_msg->len; // length of data in the payload
375 nm->alloc_len = mlen; // length of allocated payload
377 nm->xaction = hdr->xid; // reference xaction
378 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
379 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
380 memcpy( nm->payload, old_msg->payload, old_msg->len );
386 This is the receive work horse used by the outer layer receive functions.
387 It waits for a message to be received on our listen socket. If old msg
388 is passed in, the we assume we can use it instead of allocating a new
389 one, else a new block of memory is allocated.
391 This allocates a zero copy message so that if the user wishes to call
392 rmr_rts_msg() the send is zero copy.
394 The nng timeout on send is at the ms level which is a tad too long for
395 our needs. So, if NNG returns eagain or timedout (we don't set one)
396 we will loop up to 5 times with a 10 microsecond delay between each
397 attempt. If at the end of this set of retries NNG is still saying
398 eagain/timeout we'll return to the caller with that set in errno.
399 Right now this is only for zero-copy buffers (they should all be zc
403 In the NNG msg world it must allocate the receive buffer rather
404 than accepting one that we allocated from their space and could
405 reuse. They have their reasons I guess. Thus, we will free
406 the old transport buffer if user passes the message in; at least
407 our mbuf will be reused.
409 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
411 rmr_mbuf_t* msg = NULL; // msg received
413 size_t rsize; // nng needs to write back the size received... grrr
417 if( msg->tp_buf != NULL ) {
418 nng_msg_free( msg->tp_buf );
423 msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
431 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
432 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
436 if( msg->tp_buf == NULL ) { // if state is good this _should_ not be nil, but parninoia says check anyway
437 msg->state = RMR_ERR_EMPTY;
441 rsize = nng_msg_len( msg->tp_buf );
442 if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
443 ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
444 hdr = (uta_mhdr_t *) msg->header;
445 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
447 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
448 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
450 msg->state = RMR_ERR_EMPTY;
452 msg->alloc_len = rsize;
455 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
456 msg->mtype = UNSET_MSGTYPE;
457 msg->sub_id = UNSET_SUBID;
464 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
465 message buffer cannot be used to send, and the length information may or may
466 not be correct (it is set to the length received which might be more than the
467 bytes actually in the payload).
469 Mostly this supports the route table collector, but could be extended with an
470 API external function.
472 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
474 rmr_mbuf_t* msg = NULL; // msg received
475 size_t rsize; // nng needs to write back the size received... grrr
480 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
483 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
484 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
487 rsize = nng_msg_len( msg->tp_buf );
489 // do NOT use ref_tpbuf() here! Must fill these in manually.
490 msg->header = nng_msg_body( msg->tp_buf );
491 msg->len = rsize; // len is the number of bytes received
492 msg->alloc_len = rsize;
493 msg->mtype = UNSET_MSGTYPE; // raw message has no type
494 msg->sub_id = UNSET_SUBID; // nor a subscription id
496 msg->flags = MFL_RAW;
497 msg->payload = msg->header; // payload is the whole thing; no header
500 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
506 This does the hard work of actually sending the message to the given socket. On success,
507 a new message struct is returned. On error, the original msg is returned with the state
508 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
509 buffer will not be allocated and returned (mostly for call() interal processing since
510 the return message from call() is a received buffer, not a new one).
512 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
513 validation has been done prior.
515 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
518 int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
519 int spin_retries = 1000; // if eagain/timeout we'll spin this many times before giving up the CPU
520 int tr_len; // trace len in sending message so we alloc new message with same trace size
522 // future: ensure that application did not overrun the XID buffer; last byte must be 0
524 hdr = (uta_mhdr_t *) msg->header;
525 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
526 hdr->sub_id = htonl( msg->sub_id );
527 hdr->plen = htonl( msg->len );
528 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
530 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
531 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
536 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
538 if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
540 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
541 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
543 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
547 state = 0; // don't loop
548 //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
553 msg->header = NULL; // nano frees; don't risk accessing later by mistake
557 } while( state && retries > 0 );
559 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
560 msg->state = RMR_ERR_SENDFAILED;
565 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
567 //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
572 if( msg->state == RMR_OK ) { // successful send
573 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
574 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
576 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
579 } else { // send failed -- return original message
580 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
582 msg->state = RMR_ERR_RETRY; // errno will have nano reason
584 msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
587 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
594 A generic wrapper to the real send to keep wormhole stuff agnostic.
595 We assume the wormhole function vetted the buffer so we don't have to.
597 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
598 return send_msg( ctx, msg, ep->nn_sock, -1 );