1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs nanomsg).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
41 static void dump_40( char *p, char* label ) {
45 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
47 for( i = 0; i < 40; i++ ) {
48 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
50 fprintf( stderr, "\n" );
54 Translates the nng state passed in to one of ours that is suitable to put
55 into the message, and sets errno to something that might be useful.
56 If we don't have a specific RMr state, then we return the default (e.g.
59 The addition of the connection shut error code to the switch requires
60 that the NNG version at commit e618abf8f3db2a94269a (or after) be
61 used for compiling RMR.
63 static inline int xlate_si_state( int state, int def_state ) {
102 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
103 a new message struct as well. Size is the size of the zc buffer to allocate (not
104 including our header). If size is 0, then the buffer allocated is the size previously
105 allocated (if msg is !nil) or the default size given at initialisation).
107 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
108 the context value is used.
110 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
111 are zero copy, however ONLY the message is zero copy. We now allocate and use
114 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
115 size_t mlen = -1; // size of the transport buffer that we'll allocate
116 uta_mhdr_t* hdr; // convenience pointer
117 int tr_len; // trace data len (default or override)
118 int* alen; // convenience pointer to set allocated len
120 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
122 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
123 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
124 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
126 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
127 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
129 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
130 return NULL; // we used to exit -- that seems wrong
132 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
133 } else { // user message or message from the ring
134 if( mlen > msg->alloc_len ) { // current allocation is too small
135 msg->alloc_len = 0; // force tp_buffer realloc below
140 mlen = msg->alloc_len; // msg given, allocate the same size as before
144 msg->rts_fd = -1; // must force to be invalid; not a received message that can be returned
146 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
147 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
148 abort( ); // toss out a core file for this
152 memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
153 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
155 alen = (int *) msg->tp_buf;
156 *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
158 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
159 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
160 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
161 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
162 hdr->sub_id = htonl( UNSET_SUBID );
163 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
164 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
165 SET_HDR_D1_LEN( hdr, ctx->d1_len );
166 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
168 msg->len = 0; // length of data in the payload
169 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
170 msg->sub_id = UNSET_SUBID;
171 msg->mtype = UNSET_MSGTYPE;
172 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
173 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
174 msg->state = state; // fill in caller's state (likely the state of the last operation)
175 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
176 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
177 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
178 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
180 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
186 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
187 transport receive should allocate that on its own.
189 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
191 uta_mhdr_t* hdr; // convenience pointer
194 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
196 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
199 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
200 fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
201 return NULL; // this used to exit, but that seems wrong
205 memset( msg, 0, sizeof( *msg ) );
207 msg->sub_id = UNSET_SUBID;
208 msg->mtype = UNSET_MSGTYPE;
211 msg->len = -1; // no payload; invalid len
217 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
223 This accepts a message with the assumption that only the tp_buf pointer is valid. It
224 sets all of the various header/payload/xaction pointers in the mbuf to the proper
225 spot in the transport layer buffer. The len in the header is assumed to be the
226 allocated len (a receive buffer that nng created);
228 The alen parm is the assumed allocated length; assumed because it's a value likely
229 to have come from si receive and the actual alloc len might be larger, but we
230 can only assume this is the total usable space. Because we are managing a transport
231 header in the first n bytes of the real msg, we must adjust this length down by the
232 size of the tp header (for testing 50 bytes, but this should change to a struct if
233 we adopt this interface).
235 This function returns the message with an error state set if it detects that the
236 received message might have been truncated. Check is done here as the calculation
237 is somewhat based on header version.
239 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
240 uta_mhdr_t* hdr = NULL; // current header
241 uta_v1mhdr_t* v1hdr; // version 1 header
243 int hlen; // header len to use for a truncation check
245 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
247 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
249 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
251 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
253 ver = ntohl( v1hdr->rmr_ver );
258 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
259 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
260 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
262 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
263 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
264 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
265 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
267 hlen = sizeof( uta_v1mhdr_t );
270 default: // current version always lands here
271 hdr = (uta_mhdr_t *) msg->header;
272 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
273 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
275 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
276 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
277 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
278 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
279 msg->sub_id = ntohl( hdr->sub_id );
280 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
284 if( msg->len > (msg->alloc_len - hlen ) ) {
285 msg->state = RMR_ERR_TRUNC;
286 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
293 This will clone a message into a new zero copy buffer and return the cloned message.
295 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
296 rmr_mbuf_t* nm; // new message buffer
302 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
304 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
307 memset( nm, 0, sizeof( *nm ) );
309 mlen = old_msg->alloc_len; // length allocated before
310 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
311 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
315 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
316 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
317 switch( ntohl( v1hdr->rmr_ver ) ) {
319 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
320 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
323 default: // current message always caught here
325 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
326 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
330 // --- these are all version agnostic -----------------------------------
331 nm->mtype = old_msg->mtype;
332 nm->sub_id = old_msg->sub_id;
333 nm->len = old_msg->len; // length of data in the payload
334 nm->alloc_len = mlen; // length of allocated payload
336 nm->xaction = hdr->xid; // reference xaction
337 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
338 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
339 memcpy( nm->payload, old_msg->payload, old_msg->len );
345 This will clone a message with a change to the trace area in the header such that
346 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
347 The orignal message will be left unchanged, and a pointer to the new message is returned.
348 It is not possible to realloc buffers and change the data sizes.
350 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
351 rmr_mbuf_t* nm; // new message buffer
356 int tr_old_len; // tr size in new buffer
357 int* alen; // convenience pointer to set toal xmit len FIX ME!
358 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
360 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
362 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
365 memset( nm, 0, sizeof( *nm ) );
367 hdr = old_msg->header;
368 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
370 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
371 if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
373 tpb_len = mlen + TP_HDR_LEN;
374 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
375 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
378 memset( nm->tp_buf, 0, tpb_len );
379 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING
380 alen = (int *) nm->tp_buf;
381 *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len
383 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
385 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
386 switch( ntohl( v1hdr->rmr_ver ) ) {
388 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
389 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
392 default: // current message version always caught here
394 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
395 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
397 if( RMR_D1_LEN( hdr ) ) {
398 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
400 if( RMR_D2_LEN( hdr ) ) {
401 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
404 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
408 // --- these are all version agnostic -----------------------------------
409 nm->mtype = old_msg->mtype;
410 nm->sub_id = old_msg->sub_id;
411 nm->len = old_msg->len; // length of data in the payload
412 nm->alloc_len = mlen; // length of allocated payload
414 nm->xaction = hdr->xid; // reference xaction
415 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
416 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
417 memcpy( nm->payload, old_msg->payload, old_msg->len );
423 Realloc the message such that the payload is at least payload_len bytes.
424 The clone and copy options affect what portion of the original payload is copied to
425 the reallocated message, and whether or not the original payload is lost after the
426 reallocation process has finished.
429 The entire payload from the original message will be coppied to the reallocated
433 Only the header (preserving return to sender information, message type, etc)
434 is preserved after reallocation; the payload used lengrh is set to 0 and the
435 payload is NOT initialised/cleared.
438 The orignal message is preserved and a completely new message buffer and payload
439 are allocated (even if the size given is the same). A pointer to the new message
440 buffer is returned and it is the user application's responsibility to manage the
441 old buffer (e.g. free when not needed).
444 The old payload will be lost after reallocation. The message buffer pointer which
445 is returned will likely reference the same structure (don't depend on that).
449 If the message is not a message which was received, the mtype, sub-id, length values in the
450 RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
451 mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
452 resetting information in the mbuffer, this funciton will reset the mbuf values from the current
453 settings and NOT from the copied RMR header in transport buffer.
455 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
456 rmr_mbuf_t* nm = NULL; // new message buffer when cloning
458 uta_mhdr_t* omhdr; // old message header
459 int tr_old_len; // tr size in new buffer
460 int old_psize = 0; // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
461 int hdr_len = 0; // length of RMR and transport headers in old msg
462 void* old_tp_buf; // pointer to the old tp buffer
463 int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
464 int old_mt; // msg type and sub-id from the message passed in
467 int old_rfd; // rts file descriptor from old message
469 if( old_msg == NULL || payload_len <= 0 ) {
474 old_mt = old_msg->mtype; // preserve mbuf info
475 old_sid = old_msg->sub_id;
476 old_len = old_msg->len;
477 old_rfd = old_msg->rts_fd;
479 old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN); // user payload size in orig message
481 if( !clone && payload_len <= old_psize ) { // not cloning and old is large enough; nothing to do
482 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
486 hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN; // with SI we manage the transport header; must include in len
487 old_tp_buf = old_msg->tp_buf;
490 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: cloning message\n" );
493 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
495 fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
498 memset( nm, 0, sizeof( *nm ) );
499 nm->rts_fd = old_rfd; // this is managed only in the mbuf; dup now
504 omhdr = old_msg->header;
505 mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
507 if( DEBUG ) fprintf( stderr, "[DBUG] reallocate for payload increase. new message size: %d\n", (int) mlen );
508 if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
509 fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
513 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
514 SET_HDR_LEN( nm->header );
516 if( copy ) { // if we need to copy the old payload too
517 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
518 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
519 } else { // just need to copy header
520 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
521 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
524 ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
527 nm->mtype = -1; // didn't copy payload, so mtype, sub-id, and rts fd are invalid
529 nm->len = 0; // and len is 0
531 nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
533 nm->sub_id = old_sid;
537 free( old_tp_buf ); // we did not clone, so free b/c no references
544 For SI95 based transport all receives are driven through the threaded
545 ring and thus this function should NOT be called. If it is we will panic
546 and abort straight away.
548 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
550 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
557 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
558 message buffer cannot be used to send, and the length information may or may
559 not be correct (it is set to the length received which might be more than the
560 bytes actually in the payload).
562 Mostly this supports the route table collector, but could be extended with an
563 API external function.
565 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
568 FIXME: do we need this in the SI world? The only user was the route table collector
570 rmr_mbuf_t* msg = NULL; // msg received
571 size_t rsize; // nng needs to write back the size received... grrr
576 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
579 //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
580 if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
583 rsize = nng_msg_len( msg->tp_buf );
585 // do NOT use ref_tpbuf() here! Must fill these in manually.
586 msg->header = nng_msg_body( msg->tp_buf );
587 msg->len = rsize; // len is the number of bytes received
588 msg->alloc_len = rsize;
589 msg->mtype = UNSET_MSGTYPE; // raw message has no type
590 msg->sub_id = UNSET_SUBID; // nor a subscription id
592 msg->flags = MFL_RAW;
593 msg->payload = msg->header; // payload is the whole thing; no header
596 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
603 This does the hard work of actually sending the message to the given socket. On success,
604 a new message struct is returned. On error, the original msg is returned with the state
605 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
606 buffer will not be allocated and returned (mostly for call() interal processing since
607 the return message from call() is a received buffer, not a new one).
609 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
610 validation has been done prior.
612 When msg->state is not ok, this function must set tp_state in the message as some API
613 fucntions return the message directly and do not propigate errno into the message.
615 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
618 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
619 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
620 int tot_len; // total send length (hdr + user data + tp header)
622 // future: ensure that application did not overrun the XID buffer; last byte must be 0
624 hdr = (uta_mhdr_t *) msg->header;
625 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
626 hdr->sub_id = htonl( msg->sub_id );
627 hdr->plen = htonl( msg->len );
628 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
630 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
631 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
632 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
643 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
644 *((int*) msg->tp_buf) = tot_len;
646 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
647 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
649 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
650 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg: error!! sent state=%d\n", state );
652 if( retries > 0 && state == SI_ERR_BLOCKED ) {
653 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
655 if( retries > 0 ) { // only if we'll loop through again
656 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
661 state = 0; // don't loop
664 if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
669 } while( state && retries > 0 );
671 if( msg->state == RMR_OK ) { // successful send
672 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
673 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
675 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
678 } else { // send failed -- return original message
679 if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
681 msg->state = RMR_ERR_RETRY; // errno will have nano reason
683 msg->state = RMR_ERR_SENDFAILED;
686 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
693 send message with maximum timeout.
694 Accept a message and send it to an endpoint based on message type.
695 If NNG reports that the send attempt timed out, or should be retried,
696 RMr will retry for approximately max_to microseconds; rounded to the next
699 Allocates a new message buffer for the next send. If a message type has
700 more than one group of endpoints defined, then the message will be sent
701 in round robin fashion to one endpoint in each group.
703 An endpoint will be looked up in the route table using the message type and
704 the subscription id. If the subscription id is "UNSET_SUBID", then only the
705 message type is used. If the initial lookup, with a subid, fails, then a
706 second lookup using just the mtype is tried.
708 When msg->state is not OK, this function must set tp_state in the message as
709 some API fucntions return the message directly and do not propigate errno into
712 CAUTION: this is a non-blocking send. If the message cannot be sent, then
713 it will return with an error and errno set to eagain. If the send is
714 a limited fanout, then the returned status is the status of the last
718 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
719 endpoint_t* ep; // end point that we're attempting to send to
720 rtable_ent_t* rte; // the route table entry which matches the message key
721 int nn_sock; // endpoint socket (fd in si case) for send
723 int group; // selected group to get socket for
724 int send_again; // true if the message must be sent again
725 rmr_mbuf_t* clone_m; // cloned message for an nth send
726 int sock_ok; // got a valid socket from round robin select
728 int ok_sends = 0; // track number of ok sends
730 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
731 errno = EINVAL; // if msg is null, this is their clue
733 msg->state = RMR_ERR_BADARG;
734 errno = EINVAL; // must ensure it's not eagain
735 msg->tp_state = errno;
740 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
741 if( msg->header == NULL ) {
742 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
743 msg->state = RMR_ERR_NOHDR;
744 errno = EBADMSG; // must ensure it's not eagain
745 msg->tp_state = errno;
750 max_to = ctx->send_retries; // convert to retries
753 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
754 if( ctx->flags & CTXFL_WARN ) {
755 fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
757 msg->state = RMR_ERR_NOENDPT;
758 errno = ENXIO; // must ensure it's not eagain
759 msg->tp_state = errno;
760 return msg; // caller can resend (maybe) or free
763 send_again = 1; // force loop entry
764 group = 0; // always start with group 0
765 while( send_again ) {
766 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups
768 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
769 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
773 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
775 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
776 if( clone_m == NULL ) {
777 msg->state = RMR_ERR_SENDFAILED;
779 msg->tp_state = errno;
780 if( ctx->flags & CTXFL_WARN ) {
781 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
786 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
787 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
788 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
790 if( msg != NULL ) { // returned message indicates send error of some sort
791 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
795 msg = clone_m; // clone will be the next to send
798 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
801 fprintf( stderr, "[DBUG] mtosend_msg: send returned nil message!\n" );
806 if( ep != NULL && msg != NULL ) {
807 switch( msg->state ) {
809 ep->scounts[EPSC_GOOD]++;
813 ep->scounts[EPSC_TRANS]++;
817 ep->scounts[EPSC_FAIL]++;
818 uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
824 if( ctx->flags & CTXFL_WARN ) {
825 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
828 msg->state = RMR_ERR_NOENDPT;
833 if( msg ) { // call functions don't get a buffer back, so a nil check is required
834 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
835 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
839 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
841 msg->tp_state = errno;
844 return msg; // last message caries the status of last/only send attempt
849 A generic wrapper to the real send to keep wormhole stuff agnostic.
850 We assume the wormhole function vetted the buffer so we don't have to.
852 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
853 return send_msg( ctx, msg, ep->nn_sock, -1 );