1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_nng_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 Translates the nng state passed in to one of ours that is suitable to put
42 into the message, and sets errno to something that might be useful.
43 If we don't have a specific RMr state, then we return the default (e.g.
46 static inline int xlate_nng_state( int state, int def_state ) {
54 case NNG_EAGAIN: // soft errors get retry as the RMr error
55 state = RMR_ERR_RETRY;
60 state = RMR_ERR_RETRY;
80 errno = EBADFD; // file des not in a good state for the operation
85 errno = EBADFD; // file des not in a good state for the operation
99 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100 a new message struct as well. Size is the size of the zc buffer to allocate (not
101 including our header). If size is 0, then the buffer allocated is the size previously
102 allocated (if msg is !nil) or the default size given at initialisation).
104 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105 the context value is used.
107 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
108 are zero copy, however ONLY the message is zero copy. We now allocate and use
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112 size_t mlen; // size of the transport buffer that we'll allocate
113 uta_mhdr_t* hdr; // convenience pointer
114 int tr_len; // trace data len (default or override)
116 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
118 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
119 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
122 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
124 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
128 mlen = msg->alloc_len; // msg given, allocate the same size as before
131 memset( msg, 0, sizeof( *msg ) );
133 if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135 abort( ); // toss out a core file for this
138 msg->header = nng_msg_body( msg->tp_buf );
139 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
140 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
142 hdr->sub_id = htonl( UNSET_SUBID );
143 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
144 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
145 SET_HDR_D1_LEN( hdr, ctx->d1_len );
146 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
148 msg->len = 0; // length of data in the payload
149 msg->alloc_len = mlen; // length of allocated transport buffer
150 msg->sub_id = UNSET_SUBID;
151 msg->mtype = UNSET_MSGTYPE;
152 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
153 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
154 msg->state = state; // fill in caller's state (likely the state of the last operation)
155 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
156 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
158 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
164 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
165 NNG receive must allocate that on its own.
167 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
169 uta_mhdr_t* hdr; // convenience pointer
172 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
174 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
178 memset( msg, 0, sizeof( *msg ) );
180 msg->sub_id = UNSET_SUBID;
181 msg->mtype = UNSET_MSGTYPE;
184 msg->len = -1; // no payload; invalid len
188 msg->state = RMR_ERR_UNSET;
195 This accepts a message with the assumption that only the tp_buf pointer is valid. It
196 sets all of the various header/payload/xaction pointers in the mbuf to the proper
197 spot in the transport layer buffer. The len in the header is assumed to be the
198 allocated len (a receive buffer that nng created);
200 The alen parm is the assumed allocated length; assumed because it's a value likely
201 to have come from nng receive and the actual alloc len might be larger, but we
202 can only assume this is the total usable space.
204 This function returns the message with an error state set if it detects that the
205 received message might have been truncated. Check is done here as the calculation
206 is somewhat based on header version.
208 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
209 uta_mhdr_t* hdr; // current header
210 uta_v1mhdr_t* v1hdr; // version 1 header
212 int hlen; // header len to use for a truncation check
214 msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
215 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
217 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
219 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
221 ver = ntohl( v1hdr->rmr_ver );
226 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
227 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
228 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
230 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
231 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
232 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
233 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
235 hlen = sizeof( uta_v1mhdr_t );
238 default: // current version always lands here
239 hdr = (uta_mhdr_t *) msg->header;
240 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
241 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
243 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
244 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
245 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
246 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
247 msg->sub_id = ntohl( hdr->sub_id );
248 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
252 if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error
253 msg->state = RMR_ERR_TRUNC;
254 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
261 This will clone a message into a new zero copy buffer and return the cloned message.
263 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
264 rmr_mbuf_t* nm; // new message buffer
270 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
272 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
275 memset( nm, 0, sizeof( *nm ) );
277 mlen = old_msg->alloc_len; // length allocated before
278 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
279 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
283 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
284 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
285 switch( ntohl( v1hdr->rmr_ver ) ) {
287 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
288 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
291 default: // current message always caught here
293 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
294 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
298 // --- these are all version agnostic -----------------------------------
299 nm->mtype = old_msg->mtype;
300 nm->sub_id = old_msg->sub_id;
301 nm->len = old_msg->len; // length of data in the payload
302 nm->alloc_len = mlen; // length of allocated payload
304 nm->xaction = hdr->xid; // reference xaction
305 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
306 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
307 memcpy( nm->payload, old_msg->payload, old_msg->len );
313 This will clone a message with a change to the trace area in the header such that
314 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
315 The orignal message will be left unchanged, and a pointer to the new message is returned.
316 It is not possible to realloc buffers and change the data sizes.
318 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
319 rmr_mbuf_t* nm; // new message buffer
324 int tr_old_len; // tr size in new buffer
326 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
328 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
331 memset( nm, 0, sizeof( *nm ) );
333 hdr = old_msg->header;
334 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
336 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
337 if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
338 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
339 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
343 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
344 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
345 switch( ntohl( v1hdr->rmr_ver ) ) {
347 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
348 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
351 default: // current message version always caught here
353 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
354 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
356 if( RMR_D1_LEN( hdr ) ) {
357 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
359 if( RMR_D2_LEN( hdr ) ) {
360 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
363 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
367 // --- these are all version agnostic -----------------------------------
368 nm->mtype = old_msg->mtype;
369 nm->sub_id = old_msg->sub_id;
370 nm->len = old_msg->len; // length of data in the payload
371 nm->alloc_len = mlen; // length of allocated payload
373 nm->xaction = hdr->xid; // reference xaction
374 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
375 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
376 memcpy( nm->payload, old_msg->payload, old_msg->len );
382 This is the receive work horse used by the outer layer receive functions.
383 It waits for a message to be received on our listen socket. If old msg
384 is passed in, the we assume we can use it instead of allocating a new
385 one, else a new block of memory is allocated.
387 This allocates a zero copy message so that if the user wishes to call
388 rmr_rts_msg() the send is zero copy.
390 The nng timeout on send is at the ms level which is a tad too long for
391 our needs. So, if NNG returns eagain or timedout (we don't set one)
392 we will loop up to 5 times with a 10 microsecond delay between each
393 attempt. If at the end of this set of retries NNG is still saying
394 eagain/timeout we'll return to the caller with that set in errno.
395 Right now this is only for zero-copy buffers (they should all be zc
399 In the NNG msg world it must allocate the receive buffer rather
400 than accepting one that we allocated from their space and could
401 reuse. They have their reasons I guess. Thus, we will free
402 the old transport buffer if user passes the message in; at least
403 our mbuf will be reused.
405 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
407 rmr_mbuf_t* msg = NULL; // msg received
409 size_t rsize; // nng needs to write back the size received... grrr
413 if( msg->tp_buf != NULL ) {
414 nng_msg_free( msg->tp_buf );
419 msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
428 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
429 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
433 if( msg->tp_buf == NULL ) { // if state is good this _should_ not be nil, but parninoia says check anyway
434 msg->state = RMR_ERR_EMPTY;
438 rsize = nng_msg_len( msg->tp_buf );
439 if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
440 ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
441 hdr = (uta_mhdr_t *) msg->header;
442 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
444 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
445 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
447 msg->state = RMR_ERR_EMPTY;
449 msg->alloc_len = rsize;
452 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
453 msg->mtype = UNSET_MSGTYPE;
454 msg->sub_id = UNSET_SUBID;
461 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
462 message buffer cannot be used to send, and the length information may or may
463 not be correct (it is set to the length received which might be more than the
464 bytes actually in the payload).
466 Mostly this supports the route table collector, but could be extended with an
467 API external function.
469 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
471 rmr_mbuf_t* msg = NULL; // msg received
472 size_t rsize; // nng needs to write back the size received... grrr
477 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
480 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
481 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
484 rsize = nng_msg_len( msg->tp_buf );
486 // do NOT use ref_tpbuf() here! Must fill these in manually.
487 msg->header = nng_msg_body( msg->tp_buf );
488 msg->len = rsize; // len is the number of bytes received
489 msg->alloc_len = rsize;
490 msg->mtype = UNSET_MSGTYPE; // raw message has no type
491 msg->sub_id = UNSET_SUBID; // nor a subscription id
493 msg->flags = MFL_RAW;
494 msg->payload = msg->header; // payload is the whole thing; no header
497 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
503 This does the hard work of actually sending the message to the given socket. On success,
504 a new message struct is returned. On error, the original msg is returned with the state
505 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
506 buffer will not be allocated and returned (mostly for call() interal processing since
507 the return message from call() is a received buffer, not a new one).
509 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
510 validation has been done prior.
512 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
515 int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
516 int spin_retries = 1000; // if eagain/timeout we'll spin this many times before giving up the CPU
517 int tr_len; // trace len in sending message so we alloc new message with same trace size
519 // future: ensure that application did not overrun the XID buffer; last byte must be 0
521 hdr = (uta_mhdr_t *) msg->header;
522 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
523 hdr->sub_id = htonl( msg->sub_id );
524 hdr->plen = htonl( msg->len );
525 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
527 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
528 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
533 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
535 if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
537 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
538 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
540 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
544 state = 0; // don't loop
545 //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
550 msg->header = NULL; // nano frees; don't risk accessing later by mistake
554 } while( state && retries > 0 );
556 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
557 msg->state = RMR_ERR_SENDFAILED;
562 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
564 //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
569 if( msg->state == RMR_OK ) { // successful send
570 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
571 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
573 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
576 } else { // send failed -- return original message
577 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
579 msg->state = RMR_ERR_RETRY; // errno will have nano reason
581 msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
584 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
591 send message with maximum timeout.
592 Accept a message and send it to an endpoint based on message type.
593 If NNG reports that the send attempt timed out, or should be retried,
594 RMr will retry for approximately max_to microseconds; rounded to the next
597 Allocates a new message buffer for the next send. If a message type has
598 more than one group of endpoints defined, then the message will be sent
599 in round robin fashion to one endpoint in each group.
601 An endpoint will be looked up in the route table using the message type and
602 the subscription id. If the subscription id is "UNSET_SUBID", then only the
603 message type is used. If the initial lookup, with a subid, fails, then a
604 second lookup using just the mtype is tried.
606 CAUTION: this is a non-blocking send. If the message cannot be sent, then
607 it will return with an error and errno set to eagain. If the send is
608 a limited fanout, then the returned status is the status of the last
612 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
613 nng_socket nn_sock; // endpoint socket for send
615 int group; // selected group to get socket for
616 int send_again; // true if the message must be sent again
617 rmr_mbuf_t* clone_m; // cloned message for an nth send
618 int sock_ok; // got a valid socket from round robin select
619 uint64_t key; // mtype or sub-id/mtype sym table key
620 int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
623 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
624 errno = EINVAL; // if msg is null, this is their clue
626 msg->state = RMR_ERR_BADARG;
627 errno = EINVAL; // must ensure it's not eagain
632 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
633 if( msg->header == NULL ) {
634 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
635 msg->state = RMR_ERR_NOHDR;
636 errno = EBADMSG; // must ensure it's not eagain
641 max_to = ctx->send_retries; // convert to retries
644 send_again = 1; // force loop entry
645 group = 0; // always start with group 0
647 key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
648 if( msg->sub_id != UNSET_SUBID ) {
649 altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
651 while( send_again ) {
652 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
653 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
654 msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
657 if( altk_ok ) { // we can try with the alternate (no sub-id) key
659 key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
660 send_again = 1; // ensure we don't exit the while
664 msg->state = RMR_ERR_NOENDPT;
665 errno = ENXIO; // must ensure it's not eagain
666 return msg; // caller can resend (maybe) or free
672 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
673 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
674 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
675 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
678 // error do we need to count successes/errors, how to report some success, esp if last fails?
682 msg = clone_m; // clone will be the next to send
684 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
688 return msg; // last message caries the status of last/only send attempt
693 A generic wrapper to the real send to keep wormhole stuff agnostic.
694 We assume the wormhole function vetted the buffer so we don't have to.
696 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
697 return send_msg( ctx, msg, ep->nn_sock, -1 );