1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs SI95).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
34 static void dump_n( char *p, char* label, int n ) {
42 fprintf( stderr, ">>>>> %s p=%p %d bytes\n", label, p, n );
45 rows = (n/16) + ((n % 16) ? 1 : 0);
47 for( j = 0; j < rows; j++ ) {
48 fprintf( stderr, "%04x: ", j * 16 );
50 for( i = 0; t < n && i < 16; i++, t++ ) {
51 fprintf( stderr, "%02x ", (unsigned char) *p );
54 fprintf( stderr, "\n" );
59 backwards compatability.
61 static void dump_40( char *p, char* label ) {
62 dump_n( p, label, 40 );
66 Translates the nng state passed in to one of ours that is suitable to put
67 into the message, and sets errno to something that might be useful.
68 If we don't have a specific RMr state, then we return the default (e.g.
71 The addition of the connection shut error code to the switch requires
72 that the NNG version at commit e618abf8f3db2a94269a (or after) be
73 used for compiling RMR.
75 static inline int xlate_si_state( int state, int def_state ) {
114 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
115 a new message struct as well. Size is the size of the zc buffer to allocate (not
116 including our header). If size is 0, then the buffer allocated is the size previously
117 allocated (if msg is !nil) or the default size given at initialisation).
119 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
120 the context value is used.
122 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
123 are zero copy, however ONLY the message is zero copy. We now allocate and use
126 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
127 size_t mlen = -1; // size of the transport buffer that we'll allocate
128 uta_mhdr_t* hdr; // convenience pointer
129 int tr_len; // trace data len (default or override)
130 int* alen; // convenience pointer to set allocated len
132 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
134 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
135 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
136 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
138 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
139 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
141 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
142 return NULL; // we used to exit -- that seems wrong
144 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
145 } else { // user message or message from the ring
146 if( mlen > msg->alloc_len ) { // current allocation is too small
147 msg->alloc_len = 0; // force tp_buffer realloc below
152 mlen = msg->alloc_len; // msg given, allocate the same size as before
156 msg->rts_fd = -1; // must force to be invalid; not a received message that can be returned
158 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
159 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
160 abort( ); // toss out a core file for this
164 memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
165 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN ); // NOT for production -- debugging eyecatcher
167 alen = (int *) msg->tp_buf;
168 *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
170 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
171 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
172 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
173 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
174 hdr->sub_id = htonl( UNSET_SUBID );
175 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
176 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
177 SET_HDR_D1_LEN( hdr, ctx->d1_len );
178 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
180 msg->len = 0; // length of data in the payload
181 msg->cookie = 0x4942;
182 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
183 msg->sub_id = UNSET_SUBID;
184 msg->mtype = UNSET_MSGTYPE;
185 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
186 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
187 msg->state = state; // fill in caller's state (likely the state of the last operation)
188 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
189 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
190 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
191 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
193 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
199 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
200 transport receive should allocate that on its own.
202 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
204 uta_mhdr_t* hdr; // convenience pointer
207 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
209 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
212 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
213 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
214 return NULL; // this used to exit, but that seems wrong
218 memset( msg, 0, sizeof( *msg ) );
220 msg->cookie = 0x4942;
221 msg->sub_id = UNSET_SUBID;
222 msg->mtype = UNSET_MSGTYPE;
225 msg->len = -1; // no payload; invalid len
231 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
237 This accepts a message with the assumption that only the tp_buf pointer is valid. It
238 sets all of the various header/payload/xaction pointers in the mbuf to the proper
239 spot in the transport layer buffer. The len in the header is assumed to be the
240 allocated len (a receive buffer that nng created);
242 The alen parm is the assumed allocated length; assumed because it's a value likely
243 to have come from si receive and the actual alloc len might be larger, but we
244 can only assume this is the total usable space. Because we are managing a transport
245 header in the first n bytes of the real msg, we must adjust this length down by the
246 size of the tp header (for testing 50 bytes, but this should change to a struct if
247 we adopt this interface).
249 This function returns the message with an error state set if it detects that the
250 received message might have been truncated. Check is done here as the calculation
251 is somewhat based on header version.
253 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
254 uta_mhdr_t* hdr = NULL; // current header
255 uta_v1mhdr_t* v1hdr; // version 1 header
257 int hlen; // header len to use for a truncation check
259 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
261 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
263 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
265 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
267 ver = ntohl( v1hdr->rmr_ver );
272 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
273 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
274 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
276 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
277 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
278 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
279 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
281 hlen = sizeof( uta_v1mhdr_t );
284 default: // current version always lands here
285 hdr = (uta_mhdr_t *) msg->header;
286 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
287 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
289 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
290 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
291 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
292 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
293 msg->sub_id = ntohl( hdr->sub_id );
294 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
298 if( msg->len > (msg->alloc_len - hlen ) ) {
299 msg->state = RMR_ERR_TRUNC;
300 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
307 This will clone a message into a new zero copy buffer and return the cloned message.
309 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
310 rmr_mbuf_t* nm; // new message buffer
316 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
318 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
321 memset( nm, 0, sizeof( *nm ) );
323 mlen = old_msg->alloc_len; // length allocated before
324 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
325 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
329 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
330 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
331 switch( ntohl( v1hdr->rmr_ver ) ) {
333 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
334 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
337 default: // current message always caught here
339 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
340 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
344 // --- these are all version agnostic -----------------------------------
345 nm->mtype = old_msg->mtype;
346 nm->sub_id = old_msg->sub_id;
347 nm->len = old_msg->len; // length of data in the payload
348 nm->alloc_len = mlen; // length of allocated payload
350 nm->xaction = &hdr->xid[0]; // reference xaction
351 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
352 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
353 memcpy( nm->payload, old_msg->payload, old_msg->len );
359 This will clone a message with a change to the trace area in the header such that
360 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
361 The orignal message will be left unchanged, and a pointer to the new message is returned.
362 It is not possible to realloc buffers and change the data sizes.
364 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
365 rmr_mbuf_t* nm; // new message buffer
370 int tr_old_len; // tr size in new buffer
371 int* alen; // convenience pointer to set toal xmit len FIX ME!
372 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
374 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
376 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
379 memset( nm, 0, sizeof( *nm ) );
381 hdr = old_msg->header;
382 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
384 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
385 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
387 tpb_len = mlen + TP_HDR_LEN;
388 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
389 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
392 memset( nm->tp_buf, 0, tpb_len );
393 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING
394 alen = (int *) nm->tp_buf;
395 *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len
397 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
399 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
400 switch( ntohl( v1hdr->rmr_ver ) ) {
402 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
403 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
406 default: // current message version always caught here
408 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
409 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
411 if( RMR_D1_LEN( hdr ) ) {
412 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
414 if( RMR_D2_LEN( hdr ) ) {
415 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
418 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
422 // --- these are all version agnostic -----------------------------------
423 nm->mtype = old_msg->mtype;
424 nm->sub_id = old_msg->sub_id;
425 nm->len = old_msg->len; // length of data in the payload
426 nm->alloc_len = mlen; // length of allocated payload
428 nm->xaction = &hdr->xid[0]; // reference xaction
429 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
430 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
431 memcpy( nm->payload, old_msg->payload, old_msg->len );
437 Realloc the message such that the payload is at least payload_len bytes.
438 The clone and copy options affect what portion of the original payload is copied to
439 the reallocated message, and whether or not the original payload is lost after the
440 reallocation process has finished.
443 The entire payload from the original message will be coppied to the reallocated
447 Only the header (preserving return to sender information, message type, etc)
448 is preserved after reallocation; the payload used lengrh is set to 0 and the
449 payload is NOT initialised/cleared.
452 The orignal message is preserved and a completely new message buffer and payload
453 are allocated (even if the size given is the same). A pointer to the new message
454 buffer is returned and it is the user application's responsibility to manage the
455 old buffer (e.g. free when not needed).
458 The old payload will be lost after reallocation. The message buffer pointer which
459 is returned will likely reference the same structure (don't depend on that).
463 If the message is not a message which was received, the mtype, sub-id, length values in the
464 RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
465 mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
466 resetting information in the mbuffer, this funciton will reset the mbuf values from the current
467 settings and NOT from the copied RMR header in transport buffer.
469 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
470 rmr_mbuf_t* nm = NULL; // new message buffer when cloning
472 uta_mhdr_t* omhdr; // old message header
473 int tr_old_len; // tr size in new buffer
474 int old_psize = 0; // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
475 int hdr_len = 0; // length of RMR and transport headers in old msg
476 void* old_tp_buf; // pointer to the old tp buffer
477 int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
478 int old_mt; // msg type and sub-id from the message passed in
481 int old_rfd; // rts file descriptor from old message
483 if( old_msg == NULL || payload_len <= 0 ) {
488 old_mt = old_msg->mtype; // preserve mbuf info
489 old_sid = old_msg->sub_id;
490 old_len = old_msg->len;
491 old_rfd = old_msg->rts_fd;
493 old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN); // user payload size in orig message
495 if( !clone && payload_len <= old_psize ) { // not cloning and old is large enough; nothing to do
496 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
500 hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN; // with SI we manage the transport header; must include in len
501 old_tp_buf = old_msg->tp_buf;
504 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
507 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
509 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
512 memset( nm, 0, sizeof( *nm ) );
513 nm->rts_fd = old_rfd; // this is managed only in the mbuf; dup now
518 omhdr = old_msg->header;
519 mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
521 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
522 if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
523 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
528 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
529 SET_HDR_LEN( nm->header );
531 if( copy ) { // if we need to copy the old payload too
532 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
533 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
534 } else { // just need to copy header
535 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
536 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
539 ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
542 nm->mtype = -1; // didn't copy payload, so mtype, sub-id, and rts fd are invalid
544 nm->len = 0; // and len is 0
546 nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
548 nm->sub_id = old_sid;
552 free( old_tp_buf ); // we did not clone, so free b/c no references
559 For SI95 based transport all receives are driven through the threaded
560 ring and thus this function should NOT be called. If it is we will panic
561 and abort straight away.
563 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
565 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
572 This does the hard work of actually sending the message to the given socket. On success,
573 a new message struct is returned. On error, the original msg is returned with the state
574 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
575 buffer will not be allocated and returned (mostly for call() interal processing since
576 the return message from call() is a received buffer, not a new one).
578 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
579 validation has been done prior.
581 When msg->state is not ok, this function must set tp_state in the message as some API
582 fucntions return the message directly and do not propigate errno into the message.
584 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
587 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
588 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
589 int tot_len; // total send length (hdr + user data + tp header)
591 // future: ensure that application did not overrun the XID buffer; last byte must be 0
593 hdr = (uta_mhdr_t *) msg->header;
594 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
595 hdr->sub_id = htonl( msg->sub_id );
596 hdr->plen = htonl( msg->len );
597 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
599 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
600 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
601 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
612 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
613 if( tot_len > msg->alloc_len ) {
614 tot_len = msg->alloc_len; // likely bad length from user :(
616 *((int*) msg->tp_buf) = tot_len;
618 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
619 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
621 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
622 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: error!! sent state=%d\n", state );
624 if( retries > 0 && state == SI_ERR_BLOCKED ) {
625 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
627 if( retries > 0 ) { // only if we'll loop through again
628 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
633 state = 0; // don't loop
636 if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
641 } while( state && retries > 0 );
643 if( msg->state == RMR_OK ) { // successful send
644 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
645 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
647 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
650 } else { // send failed -- return original message
651 if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
653 msg->state = RMR_ERR_RETRY; // errno will have nano reason
655 msg->state = RMR_ERR_SENDFAILED;
658 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
665 send message with maximum timeout.
666 Accept a message and send it to an endpoint based on message type.
667 If NNG reports that the send attempt timed out, or should be retried,
668 RMr will retry for approximately max_to microseconds; rounded to the next
671 Allocates a new message buffer for the next send. If a message type has
672 more than one group of endpoints defined, then the message will be sent
673 in round robin fashion to one endpoint in each group.
675 An endpoint will be looked up in the route table using the message type and
676 the subscription id. If the subscription id is "UNSET_SUBID", then only the
677 message type is used. If the initial lookup, with a subid, fails, then a
678 second lookup using just the mtype is tried.
680 When msg->state is not OK, this function must set tp_state in the message as
681 some API fucntions return the message directly and do not propigate errno into
684 CAUTION: this is a non-blocking send. If the message cannot be sent, then
685 it will return with an error and errno set to eagain. If the send is
686 a limited fanout, then the returned status is the status of the last
690 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
691 endpoint_t* ep; // end point that we're attempting to send to
692 rtable_ent_t* rte; // the route table entry which matches the message key
693 int nn_sock; // endpoint socket (fd in si case) for send
695 int group; // selected group to get socket for
696 int send_again; // true if the message must be sent again
697 rmr_mbuf_t* clone_m; // cloned message for an nth send
698 int sock_ok; // got a valid socket from round robin select
700 int ok_sends = 0; // track number of ok sends
702 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
703 errno = EINVAL; // if msg is null, this is their clue
705 msg->state = RMR_ERR_BADARG;
706 errno = EINVAL; // must ensure it's not eagain
707 msg->tp_state = errno;
712 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
713 if( msg->header == NULL ) {
714 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
715 msg->state = RMR_ERR_NOHDR;
716 errno = EBADMSG; // must ensure it's not eagain
717 msg->tp_state = errno;
722 max_to = ctx->send_retries; // convert to retries
725 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
726 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
727 msg->state = RMR_ERR_NOENDPT;
728 errno = ENXIO; // must ensure it's not eagain
729 msg->tp_state = errno;
730 return msg; // caller can resend (maybe) or free
733 send_again = 1; // force loop entry
734 group = 0; // always start with group 0
735 while( send_again ) {
736 if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
737 sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
739 sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
743 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
744 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
748 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
750 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
751 if( clone_m == NULL ) {
752 msg->state = RMR_ERR_SENDFAILED;
754 msg->tp_state = errno;
755 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
759 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
760 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
761 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
763 if( msg != NULL ) { // returned message indicates send error of some sort
764 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
768 msg = clone_m; // clone will be the next to send
771 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
774 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
779 if( ep != NULL && msg != NULL ) {
780 switch( msg->state ) {
782 ep->scounts[EPSC_GOOD]++;
786 ep->scounts[EPSC_TRANS]++;
790 ep->scounts[EPSC_FAIL]++;
791 uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
796 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
797 msg->state = RMR_ERR_NOENDPT;
802 if( msg ) { // call functions don't get a buffer back, so a nil check is required
803 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
804 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
808 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
810 msg->tp_state = errno;
813 return msg; // last message caries the status of last/only send attempt
818 A generic wrapper to the real send to keep wormhole stuff agnostic.
819 We assume the wormhole function vetted the buffer so we don't have to.
821 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
822 return send_msg( ctx, msg, ep->nn_sock, -1 );