1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 static void dump_40( char *p, char* label ) {
45 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
47 for( i = 0; i < 40; i++ ) {
48 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
50 fprintf( stderr, "\n" );
54 Translates the nng state passed in to one of ours that is suitable to put
55 into the message, and sets errno to something that might be useful.
56 If we don't have a specific RMr state, then we return the default (e.g.
59 The addition of the connection shut error code to the switch requires
60 that the NNG version at commit e618abf8f3db2a94269a (or after) be
61 used for compiling RMR.
63 static inline int xlate_si_state( int state, int def_state ) {
102 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
103 a new message struct as well. Size is the size of the zc buffer to allocate (not
104 including our header). If size is 0, then the buffer allocated is the size previously
105 allocated (if msg is !nil) or the default size given at initialisation).
107 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
108 the context value is used.
110 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
111 are zero copy, however ONLY the message is zero copy. We now allocate and use
114 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
115 size_t mlen = -1; // size of the transport buffer that we'll allocate
116 uta_mhdr_t* hdr; // convenience pointer
117 int tr_len; // trace data len (default or override)
118 int* alen; // convenience pointer to set allocated len
120 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
122 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
123 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
124 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
126 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
127 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
129 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
130 return NULL; // we used to exit -- that seems wrong
132 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
133 } else { // user message or message from the ring
134 if( mlen > msg->alloc_len ) { // current allocation is too small
135 msg->alloc_len = 0; // force tp_buffer realloc below
140 mlen = msg->alloc_len; // msg given, allocate the same size as before
145 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
146 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
147 abort( ); // toss out a core file for this
151 memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
152 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
154 alen = (int *) msg->tp_buf;
155 *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
157 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
158 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
159 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
160 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
161 hdr->sub_id = htonl( UNSET_SUBID );
162 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
163 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
164 SET_HDR_D1_LEN( hdr, ctx->d1_len );
165 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
167 msg->len = 0; // length of data in the payload
168 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
169 msg->sub_id = UNSET_SUBID;
170 msg->mtype = UNSET_MSGTYPE;
171 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
172 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
173 msg->state = state; // fill in caller's state (likely the state of the last operation)
174 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
175 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
176 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
177 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
179 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
185 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
186 transport receive should allocate that on its own.
188 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
190 uta_mhdr_t* hdr; // convenience pointer
193 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
195 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
198 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
199 fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
200 return NULL; // this used to exit, but that seems wrong
204 memset( msg, 0, sizeof( *msg ) );
206 msg->sub_id = UNSET_SUBID;
207 msg->mtype = UNSET_MSGTYPE;
210 msg->len = -1; // no payload; invalid len
216 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
222 This accepts a message with the assumption that only the tp_buf pointer is valid. It
223 sets all of the various header/payload/xaction pointers in the mbuf to the proper
224 spot in the transport layer buffer. The len in the header is assumed to be the
225 allocated len (a receive buffer that nng created);
227 The alen parm is the assumed allocated length; assumed because it's a value likely
228 to have come from si receive and the actual alloc len might be larger, but we
229 can only assume this is the total usable space. Because we are managing a transport
230 header in the first n bytes of the real msg, we must adjust this length down by the
231 size of the tp header (for testing 50 bytes, but this should change to a struct if
232 we adopt this interface).
234 This function returns the message with an error state set if it detects that the
235 received message might have been truncated. Check is done here as the calculation
236 is somewhat based on header version.
238 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
239 uta_mhdr_t* hdr = NULL; // current header
240 uta_v1mhdr_t* v1hdr; // version 1 header
242 int hlen; // header len to use for a truncation check
244 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
246 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
248 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
250 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
252 ver = ntohl( v1hdr->rmr_ver );
257 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
258 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
259 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
261 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
262 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
263 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
264 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
266 hlen = sizeof( uta_v1mhdr_t );
269 default: // current version always lands here
270 hdr = (uta_mhdr_t *) msg->header;
271 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
272 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
274 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
275 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
276 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
277 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
278 msg->sub_id = ntohl( hdr->sub_id );
279 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
283 if( msg->len > (msg->alloc_len - hlen ) ) {
284 msg->state = RMR_ERR_TRUNC;
285 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
292 This will clone a message into a new zero copy buffer and return the cloned message.
294 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
295 rmr_mbuf_t* nm; // new message buffer
301 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
303 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
306 memset( nm, 0, sizeof( *nm ) );
308 mlen = old_msg->alloc_len; // length allocated before
309 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
310 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
314 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
315 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
316 switch( ntohl( v1hdr->rmr_ver ) ) {
318 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
319 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
322 default: // current message always caught here
324 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
325 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
329 // --- these are all version agnostic -----------------------------------
330 nm->mtype = old_msg->mtype;
331 nm->sub_id = old_msg->sub_id;
332 nm->len = old_msg->len; // length of data in the payload
333 nm->alloc_len = mlen; // length of allocated payload
335 nm->xaction = hdr->xid; // reference xaction
336 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
337 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
338 memcpy( nm->payload, old_msg->payload, old_msg->len );
344 This will clone a message with a change to the trace area in the header such that
345 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
346 The orignal message will be left unchanged, and a pointer to the new message is returned.
347 It is not possible to realloc buffers and change the data sizes.
349 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
350 rmr_mbuf_t* nm; // new message buffer
355 int tr_old_len; // tr size in new buffer
356 int* alen; // convenience pointer to set toal xmit len FIX ME!
357 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
359 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
361 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
364 memset( nm, 0, sizeof( *nm ) );
366 hdr = old_msg->header;
367 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
369 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
370 if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
372 tpb_len = mlen + TP_HDR_LEN;
373 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
374 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
377 memset( nm->tp_buf, 0, tpb_len );
378 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING
379 alen = (int *) nm->tp_buf;
380 *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len
382 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
384 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
385 switch( ntohl( v1hdr->rmr_ver ) ) {
387 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
388 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
391 default: // current message version always caught here
393 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
394 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
396 if( RMR_D1_LEN( hdr ) ) {
397 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
399 if( RMR_D2_LEN( hdr ) ) {
400 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
403 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
407 // --- these are all version agnostic -----------------------------------
408 nm->mtype = old_msg->mtype;
409 nm->sub_id = old_msg->sub_id;
410 nm->len = old_msg->len; // length of data in the payload
411 nm->alloc_len = mlen; // length of allocated payload
413 nm->xaction = hdr->xid; // reference xaction
414 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
415 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
416 memcpy( nm->payload, old_msg->payload, old_msg->len );
422 For SI95 based transport all receives are driven through the threaded
423 ring and thus this function should NOT be called. If it is we will panic
424 and abort straight away.
426 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
428 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
435 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
436 message buffer cannot be used to send, and the length information may or may
437 not be correct (it is set to the length received which might be more than the
438 bytes actually in the payload).
440 Mostly this supports the route table collector, but could be extended with an
441 API external function.
443 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
446 FIXME: not implemented yet
448 rmr_mbuf_t* msg = NULL; // msg received
449 size_t rsize; // nng needs to write back the size received... grrr
454 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
457 //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
458 if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
461 rsize = nng_msg_len( msg->tp_buf );
463 // do NOT use ref_tpbuf() here! Must fill these in manually.
464 msg->header = nng_msg_body( msg->tp_buf );
465 msg->len = rsize; // len is the number of bytes received
466 msg->alloc_len = rsize;
467 msg->mtype = UNSET_MSGTYPE; // raw message has no type
468 msg->sub_id = UNSET_SUBID; // nor a subscription id
470 msg->flags = MFL_RAW;
471 msg->payload = msg->header; // payload is the whole thing; no header
474 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
481 This does the hard work of actually sending the message to the given socket. On success,
482 a new message struct is returned. On error, the original msg is returned with the state
483 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
484 buffer will not be allocated and returned (mostly for call() interal processing since
485 the return message from call() is a received buffer, not a new one).
487 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
488 validation has been done prior.
490 When msg->state is not ok, this function must set tp_state in the message as some API
491 fucntions return the message directly and do not propigate errno into the message.
493 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
496 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
497 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
498 int tot_len; // total send length (hdr + user data + tp header)
500 // future: ensure that application did not overrun the XID buffer; last byte must be 0
502 hdr = (uta_mhdr_t *) msg->header;
503 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
504 hdr->sub_id = htonl( msg->sub_id );
505 hdr->plen = htonl( msg->len );
506 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
508 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
509 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
510 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
521 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
522 *((int*) msg->tp_buf) = tot_len;
524 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
525 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
527 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
528 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg: error!! sent state=%d\n", state );
530 if( retries > 0 && state == SI_ERR_BLOCKED ) {
531 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
533 if( retries > 0 ) { // only if we'll loop through again
534 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
539 state = 0; // don't loop
542 if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
547 } while( state && retries > 0 );
549 if( msg->state == RMR_OK ) { // successful send
550 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
551 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
553 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
556 } else { // send failed -- return original message
557 if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
559 msg->state = RMR_ERR_RETRY; // errno will have nano reason
561 msg->state = RMR_ERR_SENDFAILED;
564 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
571 send message with maximum timeout.
572 Accept a message and send it to an endpoint based on message type.
573 If NNG reports that the send attempt timed out, or should be retried,
574 RMr will retry for approximately max_to microseconds; rounded to the next
577 Allocates a new message buffer for the next send. If a message type has
578 more than one group of endpoints defined, then the message will be sent
579 in round robin fashion to one endpoint in each group.
581 An endpoint will be looked up in the route table using the message type and
582 the subscription id. If the subscription id is "UNSET_SUBID", then only the
583 message type is used. If the initial lookup, with a subid, fails, then a
584 second lookup using just the mtype is tried.
586 When msg->state is not OK, this function must set tp_state in the message as
587 some API fucntions return the message directly and do not propigate errno into
590 CAUTION: this is a non-blocking send. If the message cannot be sent, then
591 it will return with an error and errno set to eagain. If the send is
592 a limited fanout, then the returned status is the status of the last
596 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
597 endpoint_t* ep; // end point that we're attempting to send to
598 rtable_ent_t* rte; // the route table entry which matches the message key
599 int nn_sock; // endpoint socket (fd in si case) for send
601 int group; // selected group to get socket for
602 int send_again; // true if the message must be sent again
603 rmr_mbuf_t* clone_m; // cloned message for an nth send
604 int sock_ok; // got a valid socket from round robin select
606 int ok_sends = 0; // track number of ok sends
608 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
609 errno = EINVAL; // if msg is null, this is their clue
611 msg->state = RMR_ERR_BADARG;
612 errno = EINVAL; // must ensure it's not eagain
613 msg->tp_state = errno;
618 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
619 if( msg->header == NULL ) {
620 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
621 msg->state = RMR_ERR_NOHDR;
622 errno = EBADMSG; // must ensure it's not eagain
623 msg->tp_state = errno;
628 max_to = ctx->send_retries; // convert to retries
631 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
632 if( ctx->flags & CTXFL_WARN ) {
633 fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
635 msg->state = RMR_ERR_NOENDPT;
636 errno = ENXIO; // must ensure it's not eagain
637 msg->tp_state = errno;
638 return msg; // caller can resend (maybe) or free
641 send_again = 1; // force loop entry
642 group = 0; // always start with group 0
643 while( send_again ) {
644 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups
646 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
647 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
651 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
653 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
654 if( clone_m == NULL ) {
655 msg->state = RMR_ERR_SENDFAILED;
657 msg->tp_state = errno;
658 if( ctx->flags & CTXFL_WARN ) {
659 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
664 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
665 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
666 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
668 if( msg != NULL ) { // returned message indicates send error of some sort
669 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
673 msg = clone_m; // clone will be the next to send
676 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
679 fprintf( stderr, "[DBUG] mtosend_msg: send returned nil message!\n" );
684 if( ep != NULL && msg != NULL ) {
685 switch( msg->state ) {
687 ep->scounts[EPSC_GOOD]++;
691 ep->scounts[EPSC_TRANS]++;
695 ep->scounts[EPSC_FAIL]++;
696 uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
702 if( ctx->flags & CTXFL_WARN ) {
703 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
706 msg->state = RMR_ERR_NOENDPT;
711 if( msg ) { // call functions don't get a buffer back, so a nil check is required
712 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
713 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
717 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
719 msg->tp_state = errno;
722 return msg; // last message caries the status of last/only send attempt
727 A generic wrapper to the real send to keep wormhole stuff agnostic.
728 We assume the wormhole function vetted the buffer so we don't have to.
730 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
731 return send_msg( ctx, msg, ep->nn_sock, -1 );