1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs SI95).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
34 static void dump_n( char *p, char* label, int n ) {
42 fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
45 rows = (n/16) + ((n % 16) ? 1 : 0);
47 for( j = 0; j < rows; j++ ) {
48 fprintf( stderr, "[DUMP] %04x: ", j * 16 );
50 for( i = 0; t < n && i < 16; i++, t++ ) {
51 fprintf( stderr, "%02x ", (unsigned char) *p );
54 fprintf( stderr, "\n" );
59 backwards compatability.
61 static void dump_40( char *p, char* label ) {
62 dump_n( p, label, 40 );
66 Translates the nng state passed in to one of ours that is suitable to put
67 into the message, and sets errno to something that might be useful.
68 If we don't have a specific RMr state, then we return the default (e.g.
71 The addition of the connection shut error code to the switch requires
72 that the NNG version at commit e618abf8f3db2a94269a (or after) be
73 used for compiling RMR.
75 static inline int xlate_si_state( int state, int def_state ) {
114 Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
115 this will put in the size such that it is compatable with old versions
116 of RMR (that expect the message size to not be in network byte order)
117 and with new versions that do. See extract function in mt_call_si_static.c
118 for details on what ends up in the buffer.
120 static inline void insert_mlen( uint32_t len, char* buf ) {
121 uint32_t* blen; // pointer into buffer where we'll add the len
123 blen = (uint32_t *) buf; // old systems expect an unconverted integer
127 *blen = htonl( len ); // new systems want a converted integer
129 buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER; // marker to flag this is generated by a new message
133 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
134 a new message struct as well. Size is the size of the zc buffer to allocate (not
135 including our header). If size is 0, then the buffer allocated is the size previously
136 allocated (if msg is !nil) or the default size given at initialisation).
138 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
139 the context value is used.
141 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
142 are zero copy, however ONLY the message is zero copy. We now allocate and use
145 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
146 size_t mlen = -1; // size of the transport buffer that we'll allocate
147 uta_mhdr_t* hdr; // convenience pointer
148 int tr_len; // trace data len (default or override)
149 int* alen; // convenience pointer to set allocated len
151 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
153 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
154 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
155 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
157 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
158 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
160 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
161 return NULL; // we used to exit -- that seems wrong
163 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
164 } else { // user message or message from the ring
165 if( mlen > msg->alloc_len ) { // current allocation is too small
166 msg->alloc_len = 0; // force tp_buffer realloc below
172 mlen = msg->alloc_len; // msg given, allocate the same size as before
176 msg->rts_fd = -1; // must force to be invalid; not a received message that can be returned
178 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
179 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
180 abort( ); // toss out a core file for this
184 // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
185 memset( msg->tp_buf, 0, mlen );
186 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // do NOT use a $ in this string!
189 insert_mlen( (uint32_t) mlen, msg->tp_buf ); // this will likely be overwriten on send to shirnk
191 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
192 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
193 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
194 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
195 hdr->sub_id = htonl( UNSET_SUBID );
196 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
197 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
198 SET_HDR_D1_LEN( hdr, ctx->d1_len );
199 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
201 msg->len = 0; // length of data in the payload
202 msg->cookie = 0x4942;
203 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
204 msg->sub_id = UNSET_SUBID;
205 msg->mtype = UNSET_MSGTYPE;
206 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
207 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
208 msg->state = state; // fill in caller's state (likely the state of the last operation)
209 msg->flags = MFL_ZEROCOPY; // this is a zerocopy sendable message
210 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
211 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
212 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
214 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
220 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
221 transport receive should allocate that on its own.
223 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
225 uta_mhdr_t* hdr; // convenience pointer
228 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
230 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
233 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
234 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
235 return NULL; // this used to exit, but that seems wrong
239 memset( msg, 0, sizeof( *msg ) );
241 msg->cookie = 0x4942;
242 msg->sub_id = UNSET_SUBID;
243 msg->mtype = UNSET_MSGTYPE;
246 msg->len = -1; // no payload; invalid len
252 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
258 This accepts a message with the assumption that only the tp_buf pointer is valid. It
259 sets all of the various header/payload/xaction pointers in the mbuf to the proper
260 spot in the transport layer buffer. The len in the header is assumed to be the
261 allocated len (a receive buffer that nng created);
263 The alen parm is the assumed allocated length; assumed because it's a value likely
264 to have come from si receive and the actual alloc len might be larger, but we
265 can only assume this is the total usable space. Because we are managing a transport
266 header in the first n bytes of the real msg, we must adjust this length down by the
267 size of the tp header (for testing 50 bytes, but this should change to a struct if
268 we adopt this interface).
270 This function returns the message with an error state set if it detects that the
271 received message might have been truncated. Check is done here as the calculation
272 is somewhat based on header version.
274 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
275 uta_mhdr_t* hdr = NULL; // current header
276 uta_v1mhdr_t* v1hdr; // version 1 header
278 int hlen; // header len to use for a truncation check
280 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
282 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
284 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
286 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
288 ver = ntohl( v1hdr->rmr_ver );
293 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
294 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
295 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
297 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
298 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
299 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
300 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
302 hlen = sizeof( uta_v1mhdr_t );
305 default: // current version always lands here
306 hdr = (uta_mhdr_t *) msg->header;
307 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
308 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
310 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
311 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
312 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
313 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
314 msg->sub_id = ntohl( hdr->sub_id );
315 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
319 if( msg->len > (msg->alloc_len - hlen ) ) {
320 msg->state = RMR_ERR_TRUNC;
321 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
328 This will clone a message into a new zero copy buffer and return the cloned message.
330 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
331 rmr_mbuf_t* nm; // new message buffer
337 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
339 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
342 memset( nm, 0, sizeof( *nm ) );
344 mlen = old_msg->alloc_len; // length allocated before
345 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
346 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
350 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
351 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
352 switch( ntohl( v1hdr->rmr_ver ) ) {
354 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
355 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
358 default: // current message always caught here
360 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
361 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
365 // --- these are all version agnostic -----------------------------------
366 nm->mtype = old_msg->mtype;
367 nm->sub_id = old_msg->sub_id;
368 nm->len = old_msg->len; // length of data in the payload
369 nm->alloc_len = mlen; // length of allocated payload
371 nm->xaction = &hdr->xid[0]; // reference xaction
372 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
373 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
374 memcpy( nm->payload, old_msg->payload, old_msg->len );
380 This will clone a message with a change to the trace area in the header such that
381 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
382 The orignal message will be left unchanged, and a pointer to the new message is returned.
383 It is not possible to realloc buffers and change the data sizes.
385 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
386 rmr_mbuf_t* nm; // new message buffer
391 int tr_old_len; // tr size in new buffer
392 int* alen; // convenience pointer to set toal xmit len FIX ME!
393 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
395 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
397 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
400 memset( nm, 0, sizeof( *nm ) );
402 hdr = old_msg->header;
403 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
405 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
406 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
408 tpb_len = mlen + TP_HDR_LEN;
409 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
410 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
414 memset( nm->tp_buf, 0, tpb_len );
415 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // DEBUGGING do NOT use $ in this string!!
418 insert_mlen( (uint32_t) tpb_len, nm->tp_buf ); // this len will likely be reset on send to shrink
420 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
422 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
423 switch( ntohl( v1hdr->rmr_ver ) ) {
425 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
426 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
429 default: // current message version always caught here
431 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
432 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
434 if( RMR_D1_LEN( hdr ) ) {
435 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
437 if( RMR_D2_LEN( hdr ) ) {
438 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
441 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
445 // --- these are all version agnostic -----------------------------------
446 nm->mtype = old_msg->mtype;
447 nm->sub_id = old_msg->sub_id;
448 nm->len = old_msg->len; // length of data in the payload
449 nm->alloc_len = mlen; // length of allocated payload
451 nm->xaction = &hdr->xid[0]; // reference xaction
452 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
453 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
454 memcpy( nm->payload, old_msg->payload, old_msg->len );
460 Realloc the message such that the payload is at least payload_len bytes.
461 The clone and copy options affect what portion of the original payload is copied to
462 the reallocated message, and whether or not the original payload is lost after the
463 reallocation process has finished.
466 The entire payload from the original message will be coppied to the reallocated
470 Only the header (preserving return to sender information, message type, etc)
471 is preserved after reallocation; the payload used lengrh is set to 0 and the
472 payload is NOT initialised/cleared.
475 The orignal message is preserved and a completely new message buffer and payload
476 are allocated (even if the size given is the same). A pointer to the new message
477 buffer is returned and it is the user application's responsibility to manage the
478 old buffer (e.g. free when not needed).
481 The old payload will be lost after reallocation. The message buffer pointer which
482 is returned will likely reference the same structure (don't depend on that).
486 If the message is not a message which was received, the mtype, sub-id, length values in the
487 RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
488 mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
489 resetting information in the mbuffer, this funciton will reset the mbuf values from the current
490 settings and NOT from the copied RMR header in transport buffer.
492 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
493 rmr_mbuf_t* nm = NULL; // new message buffer when cloning
495 uta_mhdr_t* omhdr; // old message header
496 int tr_old_len; // tr size in new buffer
497 int old_psize = 0; // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
498 int hdr_len = 0; // length of RMR and transport headers in old msg
499 void* old_tp_buf; // pointer to the old tp buffer
500 int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
501 int old_mt; // msg type and sub-id from the message passed in
504 int old_rfd; // rts file descriptor from old message
506 if( old_msg == NULL || payload_len <= 0 ) {
511 old_mt = old_msg->mtype; // preserve mbuf info
512 old_sid = old_msg->sub_id;
513 old_len = old_msg->len;
514 old_rfd = old_msg->rts_fd;
516 old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN); // user payload size in orig message
518 if( !clone && payload_len <= old_psize ) { // not cloning and old is large enough; nothing to do
519 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
523 hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN; // with SI we manage the transport header; must include in len
524 old_tp_buf = old_msg->tp_buf;
527 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
530 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
532 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
535 memset( nm, 0, sizeof( *nm ) );
536 nm->rts_fd = old_rfd; // this is managed only in the mbuf; dup now
541 omhdr = old_msg->header;
542 mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
544 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
545 if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
546 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
551 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
552 SET_HDR_LEN( nm->header );
554 if( copy ) { // if we need to copy the old payload too
555 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
556 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
557 } else { // just need to copy header
558 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
559 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
562 ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
565 nm->mtype = -1; // didn't copy payload, so mtype, sub-id, and rts fd are invalid
567 nm->len = 0; // and len is 0
569 nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
571 nm->sub_id = old_sid;
575 free( old_tp_buf ); // we did not clone, so free b/c no references
582 For SI95 based transport all receives are driven through the threaded
583 ring and thus this function should NOT be called. If it is we will panic
584 and abort straight away.
586 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
588 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
595 This does the hard work of actually sending the message to the given socket. On success,
596 a new message struct is returned. On error, the original msg is returned with the state
597 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
598 buffer will not be allocated and returned (mostly for call() interal processing since
599 the return message from call() is a received buffer, not a new one).
601 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
602 validation has been done prior.
604 When msg->state is not ok, this function must set tp_state in the message as some API
605 fucntions return the message directly and do not propigate errno into the message.
607 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
610 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
611 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
612 int tot_len; // total send length (hdr + user data + tp header)
614 // future: ensure that application did not overrun the XID buffer; last byte must be 0
616 hdr = (uta_mhdr_t *) msg->header;
617 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
618 hdr->sub_id = htonl( msg->sub_id );
619 hdr->plen = htonl( msg->len );
620 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
622 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
623 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
624 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
635 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
636 if( tot_len > msg->alloc_len ) {
637 tot_len = msg->alloc_len; // likely bad length from user :(
639 insert_mlen( tot_len, msg->tp_buf ); // shrink to fit
641 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
642 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
644 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
645 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: error!! sent state=%d\n", state );
647 if( retries > 0 && state == SI_ERR_BLOCKED ) {
648 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
650 if( retries > 0 ) { // only if we'll loop through again
651 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
656 state = 0; // don't loop
659 if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
664 } while( state && retries > 0 );
666 if( msg->state == RMR_OK ) { // successful send
667 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
668 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
670 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
673 } else { // send failed -- return original message
674 if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
676 msg->state = RMR_ERR_RETRY; // errno will have nano reason
678 msg->state = RMR_ERR_SENDFAILED;
681 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
688 send message with maximum timeout.
689 Accept a message and send it to an endpoint based on message type.
690 If NNG reports that the send attempt timed out, or should be retried,
691 RMr will retry for approximately max_to microseconds; rounded to the next
694 Allocates a new message buffer for the next send. If a message type has
695 more than one group of endpoints defined, then the message will be sent
696 in round robin fashion to one endpoint in each group.
698 An endpoint will be looked up in the route table using the message type and
699 the subscription id. If the subscription id is "UNSET_SUBID", then only the
700 message type is used. If the initial lookup, with a subid, fails, then a
701 second lookup using just the mtype is tried.
703 When msg->state is not OK, this function must set tp_state in the message as
704 some API fucntions return the message directly and do not propigate errno into
707 CAUTION: this is a non-blocking send. If the message cannot be sent, then
708 it will return with an error and errno set to eagain. If the send is
709 a limited fanout, then the returned status is the status of the last
713 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
714 endpoint_t* ep; // end point that we're attempting to send to
715 rtable_ent_t* rte; // the route table entry which matches the message key
716 int nn_sock; // endpoint socket (fd in si case) for send
718 int group; // selected group to get socket for
719 int send_again; // true if the message must be sent again
720 rmr_mbuf_t* clone_m; // cloned message for an nth send
721 int sock_ok; // got a valid socket from round robin select
723 int ok_sends = 0; // track number of ok sends
725 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
726 errno = EINVAL; // if msg is null, this is their clue
728 msg->state = RMR_ERR_BADARG;
729 errno = EINVAL; // must ensure it's not eagain
730 msg->tp_state = errno;
735 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
736 if( msg->header == NULL ) {
737 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
738 msg->state = RMR_ERR_NOHDR;
739 errno = EBADMSG; // must ensure it's not eagain
740 msg->tp_state = errno;
745 max_to = ctx->send_retries; // convert to retries
748 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
749 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
750 msg->state = RMR_ERR_NOENDPT;
751 errno = ENXIO; // must ensure it's not eagain
752 msg->tp_state = errno;
753 return msg; // caller can resend (maybe) or free
756 send_again = 1; // force loop entry
757 group = 0; // always start with group 0
758 while( send_again ) {
759 if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
760 sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
762 sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
766 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
767 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
771 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
773 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
774 if( clone_m == NULL ) {
775 msg->state = RMR_ERR_SENDFAILED;
777 msg->tp_state = errno;
778 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
782 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
783 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
784 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
786 if( msg != NULL ) { // returned message indicates send error of some sort
787 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
791 msg = clone_m; // clone will be the next to send
795 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
798 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
803 if( ep != NULL && msg != NULL ) {
804 switch( msg->state ) {
806 ep->scounts[EPSC_GOOD]++;
810 ep->scounts[EPSC_TRANS]++;
814 ep->scounts[EPSC_FAIL]++;
815 uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
820 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
821 msg->state = RMR_ERR_NOENDPT;
826 if( msg ) { // call functions don't get a buffer back, so a nil check is required
827 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
828 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
832 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
834 msg->tp_state = errno;
837 return msg; // last message caries the status of last/only send attempt
842 A generic wrapper to the real send to keep wormhole stuff agnostic.
843 We assume the wormhole function vetted the buffer so we don't have to.
845 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
846 return send_msg( ctx, msg, ep->nn_sock, -1 );