1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 static void dump_40( char *p, char* label ) {
45 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
47 for( i = 0; i < 40; i++ ) {
48 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
50 fprintf( stderr, "\n" );
54 Translates the nng state passed in to one of ours that is suitable to put
55 into the message, and sets errno to something that might be useful.
56 If we don't have a specific RMr state, then we return the default (e.g.
59 The addition of the connection shut error code to the switch requires
60 that the NNG version at commit e618abf8f3db2a94269a (or after) be
61 used for compiling RMR.
63 static inline int xlate_si_state( int state, int def_state ) {
102 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
103 a new message struct as well. Size is the size of the zc buffer to allocate (not
104 including our header). If size is 0, then the buffer allocated is the size previously
105 allocated (if msg is !nil) or the default size given at initialisation).
107 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
108 the context value is used.
110 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
111 are zero copy, however ONLY the message is zero copy. We now allocate and use
114 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
115 size_t mlen = -1; // size of the transport buffer that we'll allocate
116 uta_mhdr_t* hdr; // convenience pointer
117 int tr_len; // trace data len (default or override)
118 int* alen; // convenience pointer to set allocated len
119 //int tpb_len; // transport buffer total len
120 static int logged = 0;
122 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
124 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
125 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
126 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
128 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
129 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
131 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
132 return NULL; // we used to exit -- that seems wrong
134 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
135 } else { // user message or message from the ring
136 if( mlen > msg->alloc_len ) { // current allocation is too small
137 msg->alloc_len = 0; // force tp_buffer realloc below
142 mlen = msg->alloc_len; // msg given, allocate the same size as before
147 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
148 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
149 abort( ); // toss out a core file for this
153 memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
154 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
156 alen = (int *) msg->tp_buf;
157 *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
159 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
160 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
161 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
162 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
163 hdr->sub_id = htonl( UNSET_SUBID );
164 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
165 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
166 SET_HDR_D1_LEN( hdr, ctx->d1_len );
167 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
169 msg->len = 0; // length of data in the payload
170 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
171 msg->sub_id = UNSET_SUBID;
172 msg->mtype = UNSET_MSGTYPE;
173 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
174 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
175 msg->state = state; // fill in caller's state (likely the state of the last operation)
176 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
177 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
178 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
179 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
181 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
187 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
188 transport receive should allocate that on its own.
190 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
192 uta_mhdr_t* hdr; // convenience pointer
195 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
197 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
200 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
201 fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
202 return NULL; // this used to exit, but that seems wrong
206 memset( msg, 0, sizeof( *msg ) );
208 msg->sub_id = UNSET_SUBID;
209 msg->mtype = UNSET_MSGTYPE;
212 msg->len = -1; // no payload; invalid len
218 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
224 This accepts a message with the assumption that only the tp_buf pointer is valid. It
225 sets all of the various header/payload/xaction pointers in the mbuf to the proper
226 spot in the transport layer buffer. The len in the header is assumed to be the
227 allocated len (a receive buffer that nng created);
229 The alen parm is the assumed allocated length; assumed because it's a value likely
230 to have come from si receive and the actual alloc len might be larger, but we
231 can only assume this is the total usable space. Because we are managing a transport
232 header in the first n bytes of the real msg, we must adjust this length down by the
233 size of the tp header (for testing 50 bytes, but this should change to a struct if
234 we adopt this interface).
236 This function returns the message with an error state set if it detects that the
237 received message might have been truncated. Check is done here as the calculation
238 is somewhat based on header version.
240 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
241 uta_mhdr_t* hdr = NULL; // current header
242 uta_v1mhdr_t* v1hdr; // version 1 header
244 int hlen; // header len to use for a truncation check
246 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN; // FIX ME: hard 50 needs to be some kind of tp header struct
248 // do NOT reduce alen any more. alen must be TP_HEADER + RMR_HEADER + user space
249 // get payload size will do the right thing and subtract TP_HEADER and RMR_HEADER lengths
250 //alen -= 50; // actual length of "rmr space"
252 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
254 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
256 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
258 ver = ntohl( v1hdr->rmr_ver );
263 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
264 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
265 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
267 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
268 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
269 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
270 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
272 hlen = sizeof( uta_v1mhdr_t );
275 default: // current version always lands here
276 hdr = (uta_mhdr_t *) msg->header;
277 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
278 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
280 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
281 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
282 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
283 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
284 msg->sub_id = ntohl( hdr->sub_id );
285 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
289 if( msg->len > (msg->alloc_len - hlen ) ) {
290 msg->state = RMR_ERR_TRUNC;
291 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
298 This will clone a message into a new zero copy buffer and return the cloned message.
300 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
301 rmr_mbuf_t* nm; // new message buffer
307 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
309 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
312 memset( nm, 0, sizeof( *nm ) );
314 mlen = old_msg->alloc_len; // length allocated before
315 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
316 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
320 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
321 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
322 switch( ntohl( v1hdr->rmr_ver ) ) {
324 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
325 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
328 default: // current message always caught here
330 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
331 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
335 // --- these are all version agnostic -----------------------------------
336 nm->mtype = old_msg->mtype;
337 nm->sub_id = old_msg->sub_id;
338 nm->len = old_msg->len; // length of data in the payload
339 nm->alloc_len = mlen; // length of allocated payload
341 nm->xaction = hdr->xid; // reference xaction
342 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
343 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
344 memcpy( nm->payload, old_msg->payload, old_msg->len );
350 This will clone a message with a change to the trace area in the header such that
351 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
352 The orignal message will be left unchanged, and a pointer to the new message is returned.
353 It is not possible to realloc buffers and change the data sizes.
355 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
356 rmr_mbuf_t* nm; // new message buffer
361 int tr_old_len; // tr size in new buffer
362 int* alen; // convenience pointer to set toal xmit len FIX ME!
363 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
365 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
367 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
370 memset( nm, 0, sizeof( *nm ) );
372 hdr = old_msg->header;
373 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
375 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
376 if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
378 tpb_len = mlen + TP_HDR_LEN;
379 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
380 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
383 memset( nm->tp_buf, 0, tpb_len );
384 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING
385 alen = (int *) nm->tp_buf;
386 *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len
388 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
390 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
391 switch( ntohl( v1hdr->rmr_ver ) ) {
393 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
394 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
397 default: // current message version always caught here
399 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
400 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
402 if( RMR_D1_LEN( hdr ) ) {
403 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
405 if( RMR_D2_LEN( hdr ) ) {
406 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
409 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
413 // --- these are all version agnostic -----------------------------------
414 nm->mtype = old_msg->mtype;
415 nm->sub_id = old_msg->sub_id;
416 nm->len = old_msg->len; // length of data in the payload
417 nm->alloc_len = mlen; // length of allocated payload
419 nm->xaction = hdr->xid; // reference xaction
420 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
421 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
422 memcpy( nm->payload, old_msg->payload, old_msg->len );
428 For SI95 based transport all receives are driven through the threaded
429 ring and thus this function should NOT be called. If it is we will panic
430 and abort straight away.
432 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
434 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
441 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
442 message buffer cannot be used to send, and the length information may or may
443 not be correct (it is set to the length received which might be more than the
444 bytes actually in the payload).
446 Mostly this supports the route table collector, but could be extended with an
447 API external function.
449 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
452 FIXME: not implemented yet
454 rmr_mbuf_t* msg = NULL; // msg received
455 size_t rsize; // nng needs to write back the size received... grrr
460 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
463 //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
464 if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
467 rsize = nng_msg_len( msg->tp_buf );
469 // do NOT use ref_tpbuf() here! Must fill these in manually.
470 msg->header = nng_msg_body( msg->tp_buf );
471 msg->len = rsize; // len is the number of bytes received
472 msg->alloc_len = rsize;
473 msg->mtype = UNSET_MSGTYPE; // raw message has no type
474 msg->sub_id = UNSET_SUBID; // nor a subscription id
476 msg->flags = MFL_RAW;
477 msg->payload = msg->header; // payload is the whole thing; no header
480 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
487 This does the hard work of actually sending the message to the given socket. On success,
488 a new message struct is returned. On error, the original msg is returned with the state
489 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
490 buffer will not be allocated and returned (mostly for call() interal processing since
491 the return message from call() is a received buffer, not a new one).
493 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
494 validation has been done prior.
496 When msg->state is not ok, this function must set tp_state in the message as some API
497 fucntions return the message directly and do not propigate errno into the message.
499 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
502 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
503 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
504 int tot_len; // total send length (hdr + user data + tp header)
506 // future: ensure that application did not overrun the XID buffer; last byte must be 0
508 hdr = (uta_mhdr_t *) msg->header;
509 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
510 hdr->sub_id = htonl( msg->sub_id );
511 hdr->plen = htonl( msg->len );
512 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
514 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
515 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
516 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
527 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
528 *((int*) msg->tp_buf) = tot_len;
530 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
531 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
533 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
534 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg: error!! sent state=%d\n", state );
536 if( retries > 0 && state == SI_ERR_BLOCKED ) {
537 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
539 if( retries > 0 ) { // only if we'll loop through again
540 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
545 state = 0; // don't loop
548 if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
553 } while( state && retries > 0 );
555 if( msg->state == RMR_OK ) { // successful send
556 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
557 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
559 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
562 } else { // send failed -- return original message
563 if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
565 msg->state = RMR_ERR_RETRY; // errno will have nano reason
567 msg->state = RMR_ERR_SENDFAILED;
570 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
577 send message with maximum timeout.
578 Accept a message and send it to an endpoint based on message type.
579 If NNG reports that the send attempt timed out, or should be retried,
580 RMr will retry for approximately max_to microseconds; rounded to the next
583 Allocates a new message buffer for the next send. If a message type has
584 more than one group of endpoints defined, then the message will be sent
585 in round robin fashion to one endpoint in each group.
587 An endpoint will be looked up in the route table using the message type and
588 the subscription id. If the subscription id is "UNSET_SUBID", then only the
589 message type is used. If the initial lookup, with a subid, fails, then a
590 second lookup using just the mtype is tried.
592 When msg->state is not OK, this function must set tp_state in the message as
593 some API fucntions return the message directly and do not propigate errno into
596 CAUTION: this is a non-blocking send. If the message cannot be sent, then
597 it will return with an error and errno set to eagain. If the send is
598 a limited fanout, then the returned status is the status of the last
602 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
603 endpoint_t* ep; // end point that we're attempting to send to
604 rtable_ent_t* rte; // the route table entry which matches the message key
605 int nn_sock; // endpoint socket (fd in si case) for send
607 int group; // selected group to get socket for
608 int send_again; // true if the message must be sent again
609 rmr_mbuf_t* clone_m; // cloned message for an nth send
610 int sock_ok; // got a valid socket from round robin select
612 int ok_sends = 0; // track number of ok sends
614 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
615 errno = EINVAL; // if msg is null, this is their clue
617 msg->state = RMR_ERR_BADARG;
618 errno = EINVAL; // must ensure it's not eagain
619 msg->tp_state = errno;
624 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
625 if( msg->header == NULL ) {
626 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
627 msg->state = RMR_ERR_NOHDR;
628 errno = EBADMSG; // must ensure it's not eagain
629 msg->tp_state = errno;
634 max_to = ctx->send_retries; // convert to retries
637 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
638 if( ctx->flags & CTXFL_WARN ) {
639 fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
641 msg->state = RMR_ERR_NOENDPT;
642 errno = ENXIO; // must ensure it's not eagain
643 msg->tp_state = errno;
644 return msg; // caller can resend (maybe) or free
647 send_again = 1; // force loop entry
648 group = 0; // always start with group 0
649 while( send_again ) {
650 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups
652 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
653 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
657 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
659 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
660 if( clone_m == NULL ) {
661 msg->state = RMR_ERR_SENDFAILED;
663 msg->tp_state = errno;
664 if( ctx->flags & CTXFL_WARN ) {
665 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
670 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
671 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
672 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
674 if( msg != NULL ) { // returned message indicates send error of some sort
675 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
679 msg = clone_m; // clone will be the next to send
682 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
685 fprintf( stderr, "[DBUG] mtosend_msg: send returned nil message!\n" );
690 if( ep != NULL && msg != NULL ) {
691 switch( msg->state ) {
693 ep->scounts[EPSC_GOOD]++;
697 ep->scounts[EPSC_TRANS]++;
701 ep->scounts[EPSC_FAIL]++;
702 uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
708 if( ctx->flags & CTXFL_WARN ) {
709 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
712 msg->state = RMR_ERR_NOENDPT;
717 if( msg ) { // call functions don't get a buffer back, so a nil check is required
718 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
719 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
723 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
725 msg->tp_state = errno;
728 return msg; // last message caries the status of last/only send attempt
733 A generic wrapper to the real send to keep wormhole stuff agnostic.
734 We assume the wormhole function vetted the buffer so we don't have to.
736 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
737 return send_msg( ctx, msg, ep->nn_sock, -1 );