1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_nng_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 Translates the nng state passed in to one of ours that is suitable to put
42 into the message, and sets errno to something that might be useful.
43 If we don't have a specific RMr state, then we return the default (e.g.
46 The addition of the connection shut error code to the switch requires
47 that the NNG version at commit e618abf8f3db2a94269a (or after) be
48 used for compiling RMR.
50 static inline int xlate_nng_state( int state, int def_state ) {
58 case NNG_EAGAIN: // soft errors get retry as the RMr error
59 state = RMR_ERR_RETRY;
64 state = RMR_ERR_RETRY;
84 errno = EBADFD; // file des not in a good state for the operation
88 case NNG_ECONNSHUT: // new error with nng commit e618abf8f3db2a94269a79c8901a51148d48fcc2 (Sept 2019)
90 errno = EBADFD; // file des not in a good state for the operation
104 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
105 a new message struct as well. Size is the size of the zc buffer to allocate (not
106 including our header). If size is 0, then the buffer allocated is the size previously
107 allocated (if msg is !nil) or the default size given at initialisation).
109 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
110 the context value is used.
112 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
113 are zero copy, however ONLY the message is zero copy. We now allocate and use
116 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
117 size_t mlen; // size of the transport buffer that we'll allocate
118 uta_mhdr_t* hdr; // convenience pointer
119 int tr_len; // trace data len (default or override)
121 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
123 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
124 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
127 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
129 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
133 mlen = msg->alloc_len; // msg given, allocate the same size as before
136 memset( msg, 0, sizeof( *msg ) );
138 if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
139 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
140 abort( ); // toss out a core file for this
143 msg->header = nng_msg_body( msg->tp_buf );
144 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
145 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
146 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
147 hdr->sub_id = htonl( UNSET_SUBID );
148 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
149 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
150 SET_HDR_D1_LEN( hdr, ctx->d1_len );
151 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
153 msg->len = 0; // length of data in the payload
154 msg->alloc_len = mlen; // length of allocated transport buffer
155 msg->sub_id = UNSET_SUBID;
156 msg->mtype = UNSET_MSGTYPE;
157 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
158 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
159 msg->state = state; // fill in caller's state (likely the state of the last operation)
160 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
161 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
162 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
164 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
170 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
171 NNG receive must allocate that on its own.
173 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
175 uta_mhdr_t* hdr; // convenience pointer
178 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
180 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
184 memset( msg, 0, sizeof( *msg ) );
186 msg->sub_id = UNSET_SUBID;
187 msg->mtype = UNSET_MSGTYPE;
190 msg->len = -1; // no payload; invalid len
194 msg->state = RMR_ERR_UNSET;
201 This accepts a message with the assumption that only the tp_buf pointer is valid. It
202 sets all of the various header/payload/xaction pointers in the mbuf to the proper
203 spot in the transport layer buffer. The len in the header is assumed to be the
204 allocated len (a receive buffer that nng created);
206 The alen parm is the assumed allocated length; assumed because it's a value likely
207 to have come from nng receive and the actual alloc len might be larger, but we
208 can only assume this is the total usable space.
210 This function returns the message with an error state set if it detects that the
211 received message might have been truncated. Check is done here as the calculation
212 is somewhat based on header version.
214 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
215 uta_mhdr_t* hdr = NULL; // current header
216 uta_v1mhdr_t* v1hdr; // version 1 header
218 int hlen; // header len to use for a truncation check
220 msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
221 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
223 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
225 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
227 ver = ntohl( v1hdr->rmr_ver );
232 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
233 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
234 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
236 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
237 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
238 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
239 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
241 hlen = sizeof( uta_v1mhdr_t );
244 default: // current version always lands here
245 hdr = (uta_mhdr_t *) msg->header;
246 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
247 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
249 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
250 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
251 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
252 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
253 msg->sub_id = ntohl( hdr->sub_id );
254 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
258 if( msg->len > (msg->alloc_len - hlen ) ) {
259 msg->state = RMR_ERR_TRUNC;
260 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
267 This will clone a message into a new zero copy buffer and return the cloned message.
269 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
270 rmr_mbuf_t* nm; // new message buffer
276 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
278 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
281 memset( nm, 0, sizeof( *nm ) );
283 mlen = old_msg->alloc_len; // length allocated before
284 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
285 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
289 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
290 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
291 switch( ntohl( v1hdr->rmr_ver ) ) {
294 memcpy( hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
295 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
298 default: // current message always caught here
300 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
301 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
305 // --- these are all version agnostic -----------------------------------
306 nm->mtype = old_msg->mtype;
307 nm->sub_id = old_msg->sub_id;
308 nm->len = old_msg->len; // length of data in the payload
309 nm->alloc_len = mlen; // length of allocated payload
310 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
312 nm->xaction = &hdr->xid[0]; // point at transaction id in header area
313 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
314 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
315 memcpy( nm->payload, old_msg->payload, old_msg->len );
321 This will clone a message with a change to the trace area in the header such that
322 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
323 The orignal message will be left unchanged, and a pointer to the new message is returned.
324 It is not possible to realloc buffers and change the data sizes.
326 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
327 rmr_mbuf_t* nm; // new message buffer
332 int tr_old_len; // tr size in new buffer
334 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
336 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
339 memset( nm, 0, sizeof( *nm ) );
341 hdr = old_msg->header;
342 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
344 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
345 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
346 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
347 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
351 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
352 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
353 switch( ntohl( v1hdr->rmr_ver ) ) {
355 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
356 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
359 default: // current message version always caught here
361 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
362 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
364 if( RMR_D1_LEN( hdr ) ) {
365 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
367 if( RMR_D2_LEN( hdr ) ) {
368 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
371 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
375 // --- these are all version agnostic -----------------------------------
376 nm->mtype = old_msg->mtype;
377 nm->sub_id = old_msg->sub_id;
378 nm->len = old_msg->len; // length of data in the payload
379 nm->alloc_len = mlen; // length of allocated payload
381 nm->xaction = &hdr->xid[0]; // point at transaction id in header area
382 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
383 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
384 memcpy( nm->payload, old_msg->payload, old_msg->len );
390 Realloc the message such that the payload is at least payload_len bytes. If the current
391 payload size is large enough, no action is taken. If copy is false, the actual payload
392 bytes are NOT copied. This allows a caller to realloc for a response message (to retain
393 the source information which would be lost on a simple alloc) which has no need for the
396 The old message buffer will reference the new underlying transport, and the original payload
397 will be lost unless clone is set to true. If clone is true, the old message buffer will continue
398 to reference the original payload, and a new message buffer will be allocated (even if the
399 payload size in the old message was larger than requested).
401 The return value is a pointer to the message with at least payload_len bytes allocated. It
402 will be the same as the old_message if clone is false.
405 If the message is not a message which was received, the mtype, sub-id, length values in the
406 RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
407 mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
408 resetting information in the mbuffer, this funciton will reset the mbuf values from the current
409 settings and NOT from the copied RMR header in transport buffer.
411 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
412 rmr_mbuf_t* nm = NULL; // new message buffer when cloning
415 uta_mhdr_t* omhdr; // old message header
417 int tr_old_len; // tr size in new buffer
418 int old_psize = 0; // current size of message for payload
419 int hdr_len = 0; // length of RMR header in old msg
420 void* old_tp_buf; // pointer to the old tp buffer
421 int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
422 int old_mt; // msg type and sub-id from the message passed in
426 if( old_msg == NULL || payload_len <= 0 ) {
431 old_mt = old_msg->mtype;
432 old_sid = old_msg->sub_id;
433 old_len = old_msg->len;
434 old_psize = old_msg->alloc_len - RMR_HDR_LEN( old_msg->header ); // allocated transport size less the header and other data bits
435 if( !clone && payload_len <= old_psize ) { // old message is large enough, nothing to do
436 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
440 hdr_len = RMR_HDR_LEN( old_msg->header );
441 old_tp_buf = old_msg->tp_buf;
444 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
447 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
449 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
452 memset( nm, 0, sizeof( *nm ) );
457 omhdr = old_msg->header;
458 mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
460 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
461 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
462 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
466 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
467 SET_HDR_LEN( nm->header );
469 if( copy ) { // if we need to copy the old payload too
470 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
471 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
472 } else { // just need to copy header
473 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
474 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
477 ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
480 nm->mtype = -1; // didn't copy payload, so mtype and sub-id are invalid
482 nm->len = 0; // and len is 0
484 nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
486 nm->sub_id = old_sid;
490 free( old_tp_buf ); // we did not clone, so free b/c no references
497 This is the receive work horse used by the outer layer receive functions.
498 It waits for a message to be received on our listen socket. If old msg
499 is passed in, the we assume we can use it instead of allocating a new
500 one, else a new block of memory is allocated.
502 This allocates a zero copy message so that if the user wishes to call
503 rmr_rts_msg() the send is zero copy.
505 The nng timeout on send is at the ms level which is a tad too long for
506 our needs. So, if NNG returns eagain or timedout (we don't set one)
507 we will loop up to 5 times with a 10 microsecond delay between each
508 attempt. If at the end of this set of retries NNG is still saying
509 eagain/timeout we'll return to the caller with that set in errno.
510 Right now this is only for zero-copy buffers (they should all be zc
514 In the NNG msg world it must allocate the receive buffer rather
515 than accepting one that we allocated from their space and could
516 reuse. They have their reasons I guess. Thus, we will free
517 the old transport buffer if user passes the message in; at least
518 our mbuf will be reused.
520 When msg->state is not ok, this function must set tp_state in the message as some API
521 fucntions return the message directly and do not propigate errno into the message.
523 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
525 rmr_mbuf_t* msg = NULL; // msg received
527 size_t rsize; // nng needs to write back the size received... grrr
531 if( msg->tp_buf != NULL ) {
532 nng_msg_free( msg->tp_buf );
537 msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
546 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
547 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
548 msg->tp_state = errno;
553 if( msg->tp_buf == NULL ) { // if state is good this _should_ not be nil, but parninoia says check anyway
554 msg->state = RMR_ERR_EMPTY;
559 rsize = nng_msg_len( msg->tp_buf );
560 if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
561 ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
562 hdr = (uta_mhdr_t *) msg->header;
563 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
565 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
566 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
568 msg->state = RMR_ERR_EMPTY;
571 msg->alloc_len = rsize;
574 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
575 msg->mtype = UNSET_MSGTYPE;
576 msg->sub_id = UNSET_SUBID;
583 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
584 message buffer cannot be used to send, and the length information may or may
585 not be correct (it is set to the length received which might be more than the
586 bytes actually in the payload).
588 Mostly this supports the route table collector, but could be extended with an
589 API external function.
591 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
593 rmr_mbuf_t* msg = NULL; // msg received
594 size_t rsize; // nng needs to write back the size received... grrr
599 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
602 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
603 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
606 rsize = nng_msg_len( msg->tp_buf );
608 // do NOT use ref_tpbuf() here! Must fill these in manually.
609 msg->header = nng_msg_body( msg->tp_buf );
610 msg->len = rsize; // len is the number of bytes received
611 msg->alloc_len = rsize;
612 msg->mtype = UNSET_MSGTYPE; // raw message has no type
613 msg->sub_id = UNSET_SUBID; // nor a subscription id
615 msg->flags = MFL_RAW;
616 msg->payload = msg->header; // payload is the whole thing; no header
619 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
625 This does the hard work of actually sending the message to the given socket. On success,
626 a new message struct is returned. On error, the original msg is returned with the state
627 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
628 buffer will not be allocated and returned (mostly for call() interal processing since
629 the return message from call() is a received buffer, not a new one).
631 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
632 validation has been done prior.
634 When msg->state is not ok, this function must set tp_state in the message as some API
635 fucntions return the message directly and do not propigate errno into the message.
637 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
640 int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
641 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
642 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
644 // future: ensure that application did not overrun the XID buffer; last byte must be 0
646 hdr = (uta_mhdr_t *) msg->header;
647 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
648 hdr->sub_id = htonl( msg->sub_id );
649 hdr->plen = htonl( msg->len );
650 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
652 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
653 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
654 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
664 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
666 if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
668 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
669 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
671 if( retries > 0 ) { // only if we'll loop through again
672 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
677 state = 0; // don't loop
678 //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
683 msg->header = NULL; // nano frees; don't risk accessing later by mistake
687 } while( state && retries > 0 );
689 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
690 msg->state = RMR_ERR_SENDFAILED;
692 msg->tp_state = errno;
696 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
698 //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
703 if( msg->state == RMR_OK ) { // successful send
704 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
705 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
707 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
710 } else { // send failed -- return original message
711 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
713 msg->state = RMR_ERR_RETRY; // errno will have nano reason
715 msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
718 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
725 send message with maximum timeout.
726 Accept a message and send it to an endpoint based on message type.
727 If NNG reports that the send attempt timed out, or should be retried,
728 RMr will retry for approximately max_to microseconds; rounded to the next
731 Allocates a new message buffer for the next send. If a message type has
732 more than one group of endpoints defined, then the message will be sent
733 in round robin fashion to one endpoint in each group.
735 An endpoint will be looked up in the route table using the message type and
736 the subscription id. If the subscription id is "UNSET_SUBID", then only the
737 message type is used. If the initial lookup, with a subid, fails, then a
738 second lookup using just the mtype is tried.
740 When msg->state is not OK, this function must set tp_state in the message as
741 some API fucntions return the message directly and do not propigate errno into
744 CAUTION: this is a non-blocking send. If the message cannot be sent, then
745 it will return with an error and errno set to eagain. If the send is
746 a limited fanout, then the returned status is the status of the last
750 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
751 endpoint_t* ep; // end point that we're attempting to send to
752 rtable_ent_t* rte; // the route table entry which matches the message key
753 nng_socket nn_sock; // endpoint socket for send
755 int group; // selected group to get socket for
756 int send_again; // true if the message must be sent again
757 rmr_mbuf_t* clone_m; // cloned message for an nth send
758 int sock_ok; // got a valid socket from round robin select
760 int ok_sends = 0; // track number of ok sends
762 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
763 errno = EINVAL; // if msg is null, this is their clue
765 msg->state = RMR_ERR_BADARG;
766 errno = EINVAL; // must ensure it's not eagain
767 msg->tp_state = errno;
772 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
773 if( msg->header == NULL ) {
774 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
775 msg->state = RMR_ERR_NOHDR;
776 errno = EBADMSG; // must ensure it's not eagain
777 msg->tp_state = errno;
782 max_to = ctx->send_retries; // convert to retries
785 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
786 if( ctx->flags & CTXFL_WARN ) {
787 rmr_vlog( RMR_VL_WARN, "no endpoint for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
789 msg->state = RMR_ERR_NOENDPT;
790 errno = ENXIO; // must ensure it's not eagain
791 msg->tp_state = errno;
792 return msg; // caller can resend (maybe) or free
795 send_again = 1; // force loop entry
796 group = 0; // always start with group 0
797 while( send_again ) {
798 if( rte->nrrgroups > 0 ) { // this is a round robin entry
799 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
801 sock_ok = epsock_meid( ctx->rtable, msg, &nn_sock, &ep );
805 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
806 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
810 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
812 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
813 if( clone_m == NULL ) {
814 msg->state = RMR_ERR_SENDFAILED;
816 msg->tp_state = errno;
817 if( ctx->flags & CTXFL_WARN ) {
818 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
823 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
824 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
825 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
827 if( msg != NULL ) { // returned message indicates send error of some sort
828 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
832 msg = clone_m; // clone will be the next to send
835 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
838 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
843 if( ep != NULL && msg != NULL ) {
844 switch( msg->state ) {
846 ep->scounts[EPSC_GOOD]++;
850 ep->scounts[EPSC_TRANS]++;
854 ep->scounts[EPSC_FAIL]++;
859 if( ctx->flags & CTXFL_WARN ) {
860 rmr_vlog( RMR_VL_WARN, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
862 msg->state = RMR_ERR_NOENDPT;
867 if( msg ) { // call functions don't get a buffer back, so a nil check is required
868 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
869 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
873 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
875 msg->tp_state = errno;
878 return msg; // last message caries the status of last/only send attempt
883 A generic wrapper to the real send to keep wormhole stuff agnostic.
884 We assume the wormhole function vetted the buffer so we don't have to.
886 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
887 return send_msg( ctx, msg, ep->nn_sock, -1 );