1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2021 Nokia
5 Copyright (c) 2018-2021 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs SI95).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
34 static void dump_n( char *p, char* label, int n ) {
42 fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
45 rows = (n/16) + ((n % 16) ? 1 : 0);
47 for( j = 0; j < rows; j++ ) {
48 fprintf( stderr, "[DUMP] %04x: ", j * 16 );
50 for( i = 0; t < n && i < 16; i++, t++ ) {
51 fprintf( stderr, "%02x ", (unsigned char) *p );
54 fprintf( stderr, "\n" );
59 backwards compatability.
61 static void dump_40( char *p, char* label ) {
62 dump_n( p, label, 40 );
66 Translates the nng state passed in to one of ours that is suitable to put
67 into the message, and sets errno to something that might be useful.
68 If we don't have a specific RMr state, then we return the default (e.g.
71 The addition of the connection shut error code to the switch requires
72 that the NNG version at commit e618abf8f3db2a94269a (or after) be
73 used for compiling RMR.
75 static inline int xlate_si_state( int state, int def_state ) {
114 Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
115 this will put in the size such that it is compatable with old versions
116 of RMR (that expect the message size to not be in network byte order)
117 and with new versions that do. See extract function in mt_call_si_static.c
118 for details on what ends up in the buffer.
120 static inline void insert_mlen( uint32_t len, char* buf ) {
121 uint32_t* blen; // pointer into buffer where we'll add the len
123 blen = (uint32_t *) buf; // old systems expect an unconverted integer
127 *blen = htonl( len ); // new systems want a converted integer
129 memset( &buf[TP_SZFIELD_LEN], 0, 4 ); // clear to prevent future conversion issues
130 buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER; // marker to flag this is generated by a new message
134 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
135 a new message struct as well. Size is the size of the zc buffer to allocate (not
136 including our header). If size is 0, then the buffer allocated is the size previously
137 allocated (if msg is !nil) or the default size given at initialisation).
139 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
140 the context value is used.
142 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
143 are zero copy, however ONLY the message is zero copy. We now allocate and use
146 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
147 size_t mlen = -1; // size of the transport buffer that we'll allocate
148 uta_mhdr_t* hdr; // convenience pointer
149 int tr_len; // trace data len (default or override)
150 int* alen; // convenience pointer to set allocated len
152 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
154 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
155 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
156 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
158 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
159 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
161 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
162 return NULL; // we used to exit -- that seems wrong
164 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
165 } else { // user message or message from the ring
166 if( mlen > msg->alloc_len ) { // current allocation is too small
167 msg->alloc_len = 0; // force tp_buffer realloc below
173 mlen = msg->alloc_len; // msg given, allocate the same size as before
177 msg->rts_fd = -1; // must force to be invalid; not a received message that can be returned
179 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
180 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
181 abort( ); // toss out a core file for this
185 // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
186 memset( msg->tp_buf, 0, mlen );
187 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // do NOT use a $ in this string!
190 insert_mlen( (uint32_t) mlen, msg->tp_buf ); // this will likely be overwriten on send to shirnk
192 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
193 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
194 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
195 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
196 hdr->sub_id = htonl( UNSET_SUBID );
197 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
198 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
199 SET_HDR_D1_LEN( hdr, ctx->d1_len );
200 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
202 msg->len = 0; // length of data in the payload
203 msg->cookie = 0x4942;
204 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
205 msg->sub_id = UNSET_SUBID;
206 msg->mtype = UNSET_MSGTYPE;
207 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
208 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
209 msg->state = state; // fill in caller's state (likely the state of the last operation)
210 msg->flags = MFL_ZEROCOPY; // this is a zerocopy sendable message
211 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
212 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
213 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
215 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
221 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
222 transport receive should allocate that on its own.
224 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
226 uta_mhdr_t* hdr; // convenience pointer
229 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
231 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
234 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
235 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
236 return NULL; // this used to exit, but that seems wrong
240 memset( msg, 0, sizeof( *msg ) );
242 msg->cookie = 0x4942;
243 msg->sub_id = UNSET_SUBID;
244 msg->mtype = UNSET_MSGTYPE;
247 msg->len = -1; // no payload; invalid len
253 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
259 This accepts a message with the assumption that only the tp_buf pointer is valid. It
260 sets all of the various header/payload/xaction pointers in the mbuf to the proper
261 spot in the transport layer buffer. The len in the header is assumed to be the
262 allocated len (a receive buffer that nng created);
264 The alen parm is the assumed allocated length; assumed because it's a value likely
265 to have come from si receive and the actual alloc len might be larger, but we
266 can only assume this is the total usable space. Because we are managing a transport
267 header in the first n bytes of the real msg, we must adjust this length down by the
268 size of the tp header (for testing 50 bytes, but this should change to a struct if
269 we adopt this interface).
271 This function returns the message with an error state set if it detects that the
272 received message might have been truncated. Check is done here as the calculation
273 is somewhat based on header version.
275 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
276 uta_mhdr_t* hdr = NULL; // current header
277 uta_v1mhdr_t* v1hdr; // version 1 header
279 int hlen; // header len to use for a truncation check
281 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
283 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
284 ver = ntohl( v1hdr->rmr_ver );
287 // version 1 is deprecated case 1:
288 // version 2 is deprecated case 2:
293 default: // current version always lands here
294 hdr = (uta_mhdr_t *) msg->header;
295 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
296 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
298 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
299 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
300 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
301 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
302 msg->sub_id = ntohl( hdr->sub_id );
303 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
307 if( msg->len > (msg->alloc_len - hlen ) ) {
308 msg->state = RMR_ERR_TRUNC;
309 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
316 This will clone a message into a new zero copy buffer and return the cloned message.
318 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
319 rmr_mbuf_t* nm; // new message buffer
325 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
327 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
330 memset( nm, 0, sizeof( *nm ) );
332 mlen = old_msg->alloc_len; // length allocated before
333 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
334 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
338 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
339 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
340 switch( ntohl( v1hdr->rmr_ver ) ) {
341 // version 1 deprecated case 1:
342 // version 2 deprecated
346 default: // current message always caught here
348 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
349 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
353 // --- these are all version agnostic -----------------------------------
354 nm->mtype = old_msg->mtype;
355 nm->sub_id = old_msg->sub_id;
356 nm->len = old_msg->len; // length of data in the payload
357 nm->alloc_len = mlen; // length of allocated payload
359 nm->xaction = &hdr->xid[0]; // reference xaction
360 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
361 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
362 memcpy( nm->payload, old_msg->payload, old_msg->len );
368 This will clone a message with a change to the trace area in the header such that
369 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
370 The orignal message will be left unchanged, and a pointer to the new message is returned.
371 It is not possible to realloc buffers and change the data sizes.
373 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
374 rmr_mbuf_t* nm; // new message buffer
379 int tr_old_len; // tr size in new buffer
380 int* alen; // convenience pointer to set toal xmit len FIX ME!
381 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
383 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
385 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
388 memset( nm, 0, sizeof( *nm ) );
390 hdr = old_msg->header;
391 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
393 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
394 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
396 tpb_len = mlen + TP_HDR_LEN;
397 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
398 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
402 memset( nm->tp_buf, 0, tpb_len );
403 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // DEBUGGING do NOT use $ in this string!!
406 insert_mlen( (uint32_t) tpb_len, nm->tp_buf ); // this len will likely be reset on send to shrink
408 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
410 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
411 switch( ntohl( v1hdr->rmr_ver ) ) {
412 // version 1 not supported
413 // version 2 not supported
417 default: // current message version always caught here
419 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
420 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
422 if( RMR_D1_LEN( hdr ) ) {
423 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
425 if( RMR_D2_LEN( hdr ) ) {
426 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
429 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
433 // --- these are all version agnostic -----------------------------------
434 nm->mtype = old_msg->mtype;
435 nm->sub_id = old_msg->sub_id;
436 nm->len = old_msg->len; // length of data in the payload
437 nm->alloc_len = mlen; // length of allocated payload
439 nm->xaction = &hdr->xid[0]; // reference xaction
440 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
441 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
442 memcpy( nm->payload, old_msg->payload, old_msg->len );
448 Realloc the message such that the payload is at least payload_len bytes.
449 The clone and copy options affect what portion of the original payload is copied to
450 the reallocated message, and whether or not the original payload is lost after the
451 reallocation process has finished.
454 The entire payload from the original message will be coppied to the reallocated
458 Only the header (preserving return to sender information, message type, etc)
459 is preserved after reallocation; the payload used lengrh is set to 0 and the
460 payload is NOT initialised/cleared.
463 The orignal message is preserved and a completely new message buffer and payload
464 are allocated (even if the size given is the same). A pointer to the new message
465 buffer is returned and it is the user application's responsibility to manage the
466 old buffer (e.g. free when not needed).
469 The old payload will be lost after reallocation. The message buffer pointer which
470 is returned will likely reference the same structure (don't depend on that).
474 If the message is not a message which was received, the mtype, sub-id, length values in the
475 RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
476 mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
477 resetting information in the mbuffer, this funciton will reset the mbuf values from the current
478 settings and NOT from the copied RMR header in transport buffer.
480 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
481 rmr_mbuf_t* nm = NULL; // new message buffer when cloning
483 uta_mhdr_t* omhdr; // old message header
484 int tr_old_len; // tr size in new buffer
485 int old_psize = 0; // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
486 int hdr_len = 0; // length of RMR and transport headers in old msg
487 void* old_tp_buf; // pointer to the old tp buffer
488 int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
489 int old_mt; // msg type and sub-id from the message passed in
492 int old_rfd; // rts file descriptor from old message
494 if( old_msg == NULL || payload_len <= 0 ) {
499 old_mt = old_msg->mtype; // preserve mbuf info
500 old_sid = old_msg->sub_id;
501 old_len = old_msg->len;
502 old_rfd = old_msg->rts_fd;
504 old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN); // user payload size in orig message
506 if( !clone && payload_len <= old_psize ) { // not cloning and old is large enough; nothing to do
507 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
511 hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN; // with SI we manage the transport header; must include in len
512 old_tp_buf = old_msg->tp_buf;
515 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
518 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
520 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
523 memset( nm, 0, sizeof( *nm ) );
524 nm->rts_fd = old_rfd; // this is managed only in the mbuf; dup now
529 omhdr = old_msg->header;
530 mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
532 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
533 if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
534 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
539 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
540 SET_HDR_LEN( nm->header );
542 if( copy != 0 ) { // if we need to copy the old payload too
543 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
544 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
545 } else { // just need to copy header
546 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
547 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
550 ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
553 nm->mtype = -1; // didn't copy payload, so mtype, sub-id, and rts fd are invalid
555 nm->len = 0; // and len is 0
557 nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
559 nm->sub_id = old_sid;
563 free( old_tp_buf ); // we did not clone, so free b/c no references
570 This does the hard work of actually sending the message to the given socket. On success,
571 a new message struct is returned. On error, the original msg is returned with the state
572 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
573 buffer will not be allocated and returned (mostly for call() interal processing since
574 the return message from call() is a received buffer, not a new one).
576 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
577 validation has been done prior.
579 When msg->state is not ok, this function must set tp_state in the message as some API
580 fucntions return the message directly and do not propigate errno into the message.
582 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
585 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
586 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
587 int tot_len; // total send length (hdr + user data + tp header)
589 // future: ensure that application did not overrun the XID buffer; last byte must be 0
591 hdr = (uta_mhdr_t *) msg->header;
592 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
593 hdr->sub_id = htonl( msg->sub_id );
594 hdr->plen = htonl( msg->len );
595 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
597 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
598 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
599 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
610 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
611 if( tot_len > msg->alloc_len ) {
612 tot_len = msg->alloc_len; // likely bad length from user :(
614 insert_mlen( tot_len, msg->tp_buf ); // shrink to fit
616 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
617 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
619 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
620 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: error!! sent state=%d\n", state );
622 if( retries > 0 && state == SI_ERR_BLOCKED ) {
623 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
625 if( retries > 0 ) { // only if we'll loop through again
626 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
631 state = 0; // don't loop
634 if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
639 } while( state && retries > 0 );
641 if( msg->state == RMR_OK ) { // successful send
642 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
643 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
645 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
648 } else { // send failed or would block -- return original message
649 if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
651 msg->state = RMR_ERR_RETRY;
653 rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
654 msg->state = RMR_ERR_SENDFAILED;
657 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
664 send message with maximum timeout.
665 Accept a message and send it to an endpoint based on message type.
666 If NNG reports that the send attempt timed out, or should be retried,
667 RMr will retry for approximately max_to microseconds; rounded to the next
670 Allocates a new message buffer for the next send. If a message type has
671 more than one group of endpoints defined, then the message will be sent
672 in round robin fashion to one endpoint in each group.
674 An endpoint will be looked up in the route table using the message type and
675 the subscription id. If the subscription id is "UNSET_SUBID", then only the
676 message type is used. If the initial lookup, with a subid, fails, then a
677 second lookup using just the mtype is tried.
679 When msg->state is not OK, this function must set tp_state in the message as
680 some API fucntions return the message directly and do not propigate errno into
683 CAUTION: this is a non-blocking send. If the message cannot be sent, then
684 it will return with an error and errno set to eagain. If the send is
685 a limited fanout, then the returned status is the status of the last
689 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
690 endpoint_t* ep; // end point that we're attempting to send to
691 rtable_ent_t* rte; // the route table entry which matches the message key
692 int nn_sock; // endpoint socket (fd in si case) for send
694 int group; // selected group to get socket for
695 int send_again; // true if the message must be sent again
696 rmr_mbuf_t* clone_m; // cloned message for an nth send
697 int sock_ok; // got a valid socket from round robin select
699 int ok_sends = 0; // track number of ok sends
700 route_table_t* rt; // active route table
702 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
703 errno = EINVAL; // if msg is null, this is their clue
705 msg->state = RMR_ERR_BADARG;
706 errno = EINVAL; // must ensure it's not eagain
707 msg->tp_state = errno;
712 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
713 if( msg->header == NULL ) {
714 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
715 msg->state = RMR_ERR_NOHDR;
716 errno = EBADMSG; // must ensure it's not eagain
717 msg->tp_state = errno;
722 max_to = ctx->send_retries; // convert to retries
725 rt = get_rt( ctx ); // get active route table and up ref count
726 if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
727 release_rt( ctx, rt );
728 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
729 msg->state = RMR_ERR_NOENDPT;
730 errno = ENXIO; // must ensure it's not eagain
731 msg->tp_state = errno;
732 return msg; // caller can resend (maybe) or free
735 send_again = 1; // force loop entry
736 group = 0; // always start with group 0
737 while( send_again ) {
738 if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
739 sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
741 sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
745 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
746 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
750 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
752 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
753 if( clone_m == NULL ) {
754 release_rt( ctx, rt );
755 msg->state = RMR_ERR_SENDFAILED;
757 msg->tp_state = errno;
758 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
762 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
763 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
764 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
766 if( msg != NULL ) { // returned message indicates send error of some sort
767 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
771 msg = clone_m; // clone will be the next to send
775 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
778 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
784 incr_ep_counts( msg->state, ep );
787 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
788 msg->state = RMR_ERR_NOENDPT;
793 release_rt( ctx, rt ); // we can safely dec the ref counter now
795 if( msg ) { // call functions don't get a buffer back, so a nil check is required
796 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
797 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
801 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
803 msg->tp_state = errno;
806 return msg; // last message caries the status of last/only send attempt
811 A generic wrapper to the real send to keep wormhole stuff agnostic.
812 We assume the wormhole function vetted the buffer so we don't have to.
814 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
815 return send_msg( ctx, msg, ep->nn_sock, -1 );