1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_nng_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 Translates the nng state passed in to one of ours that is suitable to put
42 into the message, and sets errno to something that might be useful.
43 If we don't have a specific RMr state, then we return the default (e.g.
46 static inline int xlate_nng_state( int state, int def_state ) {
54 case NNG_EAGAIN: // soft errors get retry as the RMr error
55 state = RMR_ERR_RETRY;
60 state = RMR_ERR_RETRY;
80 errno = EBADFD; // file des not in a good state for the operation
85 errno = EBADFD; // file des not in a good state for the operation
99 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100 a new message struct as well. Size is the size of the zc buffer to allocate (not
101 including our header). If size is 0, then the buffer allocated is the size previously
102 allocated (if msg is !nil) or the default size given at initialisation).
104 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105 the context value is used.
107 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
108 are zero copy, however ONLY the message is zero copy. We now allocate and use
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112 size_t mlen; // size of the transport buffer that we'll allocate
113 uta_mhdr_t* hdr; // convenience pointer
114 int tr_len; // trace data len (default or override)
116 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
118 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
119 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
122 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
124 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
128 mlen = msg->alloc_len; // msg given, allocate the same size as before
131 memset( msg, 0, sizeof( *msg ) );
133 if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135 abort( ); // toss out a core file for this
138 msg->header = nng_msg_body( msg->tp_buf );
139 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
140 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
142 hdr->sub_id = htonl( UNSET_SUBID );
143 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
144 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
145 SET_HDR_D1_LEN( hdr, ctx->d1_len );
146 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
148 msg->len = 0; // length of data in the payload
149 msg->alloc_len = mlen; // length of allocated transport buffer
150 msg->sub_id = UNSET_SUBID;
151 msg->mtype = UNSET_MSGTYPE;
152 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
153 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
154 msg->state = state; // fill in caller's state (likely the state of the last operation)
155 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
156 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
157 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
159 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
165 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
166 NNG receive must allocate that on its own.
168 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
170 uta_mhdr_t* hdr; // convenience pointer
173 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
175 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
179 memset( msg, 0, sizeof( *msg ) );
181 msg->sub_id = UNSET_SUBID;
182 msg->mtype = UNSET_MSGTYPE;
185 msg->len = -1; // no payload; invalid len
189 msg->state = RMR_ERR_UNSET;
196 This accepts a message with the assumption that only the tp_buf pointer is valid. It
197 sets all of the various header/payload/xaction pointers in the mbuf to the proper
198 spot in the transport layer buffer. The len in the header is assumed to be the
199 allocated len (a receive buffer that nng created);
201 The alen parm is the assumed allocated length; assumed because it's a value likely
202 to have come from nng receive and the actual alloc len might be larger, but we
203 can only assume this is the total usable space.
205 This function returns the message with an error state set if it detects that the
206 received message might have been truncated. Check is done here as the calculation
207 is somewhat based on header version.
209 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
210 uta_mhdr_t* hdr = NULL; // current header
211 uta_v1mhdr_t* v1hdr; // version 1 header
213 int hlen; // header len to use for a truncation check
215 msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
216 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
218 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
220 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
222 ver = ntohl( v1hdr->rmr_ver );
227 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
228 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
229 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
231 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
232 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
233 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
234 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
236 hlen = sizeof( uta_v1mhdr_t );
239 default: // current version always lands here
240 hdr = (uta_mhdr_t *) msg->header;
241 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
242 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
244 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
245 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
246 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
247 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
248 msg->sub_id = ntohl( hdr->sub_id );
249 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
253 if( msg->len > (msg->alloc_len - hlen ) ) {
254 msg->state = RMR_ERR_TRUNC;
255 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
262 This will clone a message into a new zero copy buffer and return the cloned message.
264 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
265 rmr_mbuf_t* nm; // new message buffer
271 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
273 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
276 memset( nm, 0, sizeof( *nm ) );
278 mlen = old_msg->alloc_len; // length allocated before
279 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
280 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
284 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
285 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
286 switch( ntohl( v1hdr->rmr_ver ) ) {
288 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
289 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
292 default: // current message always caught here
294 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
295 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
299 // --- these are all version agnostic -----------------------------------
300 nm->mtype = old_msg->mtype;
301 nm->sub_id = old_msg->sub_id;
302 nm->len = old_msg->len; // length of data in the payload
303 nm->alloc_len = mlen; // length of allocated payload
305 nm->xaction = hdr->xid; // reference xaction
306 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
307 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
308 memcpy( nm->payload, old_msg->payload, old_msg->len );
314 This will clone a message with a change to the trace area in the header such that
315 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
316 The orignal message will be left unchanged, and a pointer to the new message is returned.
317 It is not possible to realloc buffers and change the data sizes.
319 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
320 rmr_mbuf_t* nm; // new message buffer
325 int tr_old_len; // tr size in new buffer
327 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
329 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
332 memset( nm, 0, sizeof( *nm ) );
334 hdr = old_msg->header;
335 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
337 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
338 if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
339 if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
340 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
344 nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
345 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
346 switch( ntohl( v1hdr->rmr_ver ) ) {
348 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
349 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
352 default: // current message version always caught here
354 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
355 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
357 if( RMR_D1_LEN( hdr ) ) {
358 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
360 if( RMR_D2_LEN( hdr ) ) {
361 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
364 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
368 // --- these are all version agnostic -----------------------------------
369 nm->mtype = old_msg->mtype;
370 nm->sub_id = old_msg->sub_id;
371 nm->len = old_msg->len; // length of data in the payload
372 nm->alloc_len = mlen; // length of allocated payload
374 nm->xaction = hdr->xid; // reference xaction
375 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
376 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
377 memcpy( nm->payload, old_msg->payload, old_msg->len );
383 This is the receive work horse used by the outer layer receive functions.
384 It waits for a message to be received on our listen socket. If old msg
385 is passed in, the we assume we can use it instead of allocating a new
386 one, else a new block of memory is allocated.
388 This allocates a zero copy message so that if the user wishes to call
389 rmr_rts_msg() the send is zero copy.
391 The nng timeout on send is at the ms level which is a tad too long for
392 our needs. So, if NNG returns eagain or timedout (we don't set one)
393 we will loop up to 5 times with a 10 microsecond delay between each
394 attempt. If at the end of this set of retries NNG is still saying
395 eagain/timeout we'll return to the caller with that set in errno.
396 Right now this is only for zero-copy buffers (they should all be zc
400 In the NNG msg world it must allocate the receive buffer rather
401 than accepting one that we allocated from their space and could
402 reuse. They have their reasons I guess. Thus, we will free
403 the old transport buffer if user passes the message in; at least
404 our mbuf will be reused.
406 When msg->state is not ok, this function must set tp_state in the message as some API
407 fucntions return the message directly and do not propigate errno into the message.
409 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
411 rmr_mbuf_t* msg = NULL; // msg received
413 size_t rsize; // nng needs to write back the size received... grrr
417 if( msg->tp_buf != NULL ) {
418 nng_msg_free( msg->tp_buf );
423 msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
432 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
433 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
434 msg->tp_state = errno;
439 if( msg->tp_buf == NULL ) { // if state is good this _should_ not be nil, but parninoia says check anyway
440 msg->state = RMR_ERR_EMPTY;
445 rsize = nng_msg_len( msg->tp_buf );
446 if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
447 ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
448 hdr = (uta_mhdr_t *) msg->header;
449 msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
451 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
452 msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
454 msg->state = RMR_ERR_EMPTY;
457 msg->alloc_len = rsize;
460 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
461 msg->mtype = UNSET_MSGTYPE;
462 msg->sub_id = UNSET_SUBID;
469 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
470 message buffer cannot be used to send, and the length information may or may
471 not be correct (it is set to the length received which might be more than the
472 bytes actually in the payload).
474 Mostly this supports the route table collector, but could be extended with an
475 API external function.
477 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
479 rmr_mbuf_t* msg = NULL; // msg received
480 size_t rsize; // nng needs to write back the size received... grrr
485 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
488 msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
489 if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
492 rsize = nng_msg_len( msg->tp_buf );
494 // do NOT use ref_tpbuf() here! Must fill these in manually.
495 msg->header = nng_msg_body( msg->tp_buf );
496 msg->len = rsize; // len is the number of bytes received
497 msg->alloc_len = rsize;
498 msg->mtype = UNSET_MSGTYPE; // raw message has no type
499 msg->sub_id = UNSET_SUBID; // nor a subscription id
501 msg->flags = MFL_RAW;
502 msg->payload = msg->header; // payload is the whole thing; no header
505 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
511 This does the hard work of actually sending the message to the given socket. On success,
512 a new message struct is returned. On error, the original msg is returned with the state
513 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
514 buffer will not be allocated and returned (mostly for call() interal processing since
515 the return message from call() is a received buffer, not a new one).
517 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
518 validation has been done prior.
520 When msg->state is not ok, this function must set tp_state in the message as some API
521 fucntions return the message directly and do not propigate errno into the message.
523 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
526 int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
527 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
528 int tr_len; // trace len in sending message so we alloc new message with same trace size
530 // future: ensure that application did not overrun the XID buffer; last byte must be 0
532 hdr = (uta_mhdr_t *) msg->header;
533 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
534 hdr->sub_id = htonl( msg->sub_id );
535 hdr->plen = htonl( msg->len );
536 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
538 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
539 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
540 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
545 if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
547 if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
549 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
550 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
552 if( retries > 0 ) { // only if we'll loop through again
553 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
558 state = 0; // don't loop
559 //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
564 msg->header = NULL; // nano frees; don't risk accessing later by mistake
568 } while( state && retries > 0 );
570 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
571 msg->state = RMR_ERR_SENDFAILED;
573 msg->tp_state = errno;
577 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
579 //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
584 if( msg->state == RMR_OK ) { // successful send
585 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
586 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
588 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
591 } else { // send failed -- return original message
592 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
594 msg->state = RMR_ERR_RETRY; // errno will have nano reason
596 msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
599 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
606 send message with maximum timeout.
607 Accept a message and send it to an endpoint based on message type.
608 If NNG reports that the send attempt timed out, or should be retried,
609 RMr will retry for approximately max_to microseconds; rounded to the next
612 Allocates a new message buffer for the next send. If a message type has
613 more than one group of endpoints defined, then the message will be sent
614 in round robin fashion to one endpoint in each group.
616 An endpoint will be looked up in the route table using the message type and
617 the subscription id. If the subscription id is "UNSET_SUBID", then only the
618 message type is used. If the initial lookup, with a subid, fails, then a
619 second lookup using just the mtype is tried.
621 When msg->state is not OK, this function must set tp_state in the message as
622 some API fucntions return the message directly and do not propigate errno into
625 CAUTION: this is a non-blocking send. If the message cannot be sent, then
626 it will return with an error and errno set to eagain. If the send is
627 a limited fanout, then the returned status is the status of the last
631 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
632 nng_socket nn_sock; // endpoint socket for send
634 int group; // selected group to get socket for
635 int send_again; // true if the message must be sent again
636 rmr_mbuf_t* clone_m; // cloned message for an nth send
637 int sock_ok; // got a valid socket from round robin select
638 uint64_t key; // mtype or sub-id/mtype sym table key
639 int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
642 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
643 errno = EINVAL; // if msg is null, this is their clue
645 msg->state = RMR_ERR_BADARG;
646 errno = EINVAL; // must ensure it's not eagain
647 msg->tp_state = errno;
652 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
653 if( msg->header == NULL ) {
654 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
655 msg->state = RMR_ERR_NOHDR;
656 errno = EBADMSG; // must ensure it's not eagain
657 msg->tp_state = errno;
662 max_to = ctx->send_retries; // convert to retries
665 send_again = 1; // force loop entry
666 group = 0; // always start with group 0
668 key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
669 if( msg->sub_id != UNSET_SUBID ) {
670 altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
672 while( send_again ) {
673 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
674 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
675 msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
678 if( altk_ok ) { // we can try with the alternate (no sub-id) key
680 key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
681 send_again = 1; // ensure we don't exit the while
685 msg->state = RMR_ERR_NOENDPT;
686 errno = ENXIO; // must ensure it's not eagain
687 msg->tp_state = errno;
688 return msg; // caller can resend (maybe) or free
694 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
695 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
696 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
697 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
700 // error do we need to count successes/errors, how to report some success, esp if last fails?
704 msg = clone_m; // clone will be the next to send
706 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
710 return msg; // last message caries the status of last/only send attempt
715 A generic wrapper to the real send to keep wormhole stuff agnostic.
716 We assume the wormhole function vetted the buffer so we don't have to.
718 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
719 return send_msg( ctx, msg, ep->nn_sock, -1 );