1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: sr_si_static.c
23 Abstract: These are static send/receive primatives which (sadly)
24 differ based on the underlying protocol (nng vs SI95).
25 Split from rmr_nng.c for easier wormhole support.
27 Author: E. Scott Daniels
28 Date: 13 February 2019
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
34 static void dump_40( char *p, char* label ) {
38 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
40 for( i = 0; i < 40; i++ ) {
41 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
43 fprintf( stderr, "\n" );
47 Translates the nng state passed in to one of ours that is suitable to put
48 into the message, and sets errno to something that might be useful.
49 If we don't have a specific RMr state, then we return the default (e.g.
52 The addition of the connection shut error code to the switch requires
53 that the NNG version at commit e618abf8f3db2a94269a (or after) be
54 used for compiling RMR.
56 static inline int xlate_si_state( int state, int def_state ) {
95 Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
96 a new message struct as well. Size is the size of the zc buffer to allocate (not
97 including our header). If size is 0, then the buffer allocated is the size previously
98 allocated (if msg is !nil) or the default size given at initialisation).
100 The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
101 the context value is used.
103 NOTE: while accurate, the nng doc implies that both the msg buffer and data buffer
104 are zero copy, however ONLY the message is zero copy. We now allocate and use
107 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
108 size_t mlen = -1; // size of the transport buffer that we'll allocate
109 uta_mhdr_t* hdr; // convenience pointer
110 int tr_len; // trace data len (default or override)
111 int* alen; // convenience pointer to set allocated len
113 tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
115 mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
116 mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
117 mlen = sizeof( char ) * (mlen + TP_HDR_LEN); // finally add the transport header len
119 if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
120 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
122 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
123 return NULL; // we used to exit -- that seems wrong
125 memset( msg, 0, sizeof( *msg ) ); // tp_buffer will be allocated below
126 } else { // user message or message from the ring
127 if( mlen > msg->alloc_len ) { // current allocation is too small
128 msg->alloc_len = 0; // force tp_buffer realloc below
133 mlen = msg->alloc_len; // msg given, allocate the same size as before
137 msg->rts_fd = -1; // must force to be invalid; not a received message that can be returned
139 if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
140 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
141 abort( ); // toss out a core file for this
145 memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
146 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
148 alen = (int *) msg->tp_buf;
149 *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
151 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
152 memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
153 if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
154 hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
155 hdr->sub_id = htonl( UNSET_SUBID );
156 SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
157 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
158 SET_HDR_D1_LEN( hdr, ctx->d1_len );
159 //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
161 msg->len = 0; // length of data in the payload
162 msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
163 msg->sub_id = UNSET_SUBID;
164 msg->mtype = UNSET_MSGTYPE;
165 msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
166 msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
167 msg->state = state; // fill in caller's state (likely the state of the last operation)
168 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
169 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
170 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
171 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
173 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
179 Allocates only the mbuf and does NOT allocate an underlying transport buffer since
180 transport receive should allocate that on its own.
182 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
184 uta_mhdr_t* hdr; // convenience pointer
187 if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
189 free( msg->tp_buf ); // caller doesn't want it -- future put this on an accumulation ring
192 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
193 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
194 return NULL; // this used to exit, but that seems wrong
198 memset( msg, 0, sizeof( *msg ) );
200 msg->sub_id = UNSET_SUBID;
201 msg->mtype = UNSET_MSGTYPE;
204 msg->len = -1; // no payload; invalid len
210 msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
216 This accepts a message with the assumption that only the tp_buf pointer is valid. It
217 sets all of the various header/payload/xaction pointers in the mbuf to the proper
218 spot in the transport layer buffer. The len in the header is assumed to be the
219 allocated len (a receive buffer that nng created);
221 The alen parm is the assumed allocated length; assumed because it's a value likely
222 to have come from si receive and the actual alloc len might be larger, but we
223 can only assume this is the total usable space. Because we are managing a transport
224 header in the first n bytes of the real msg, we must adjust this length down by the
225 size of the tp header (for testing 50 bytes, but this should change to a struct if
226 we adopt this interface).
228 This function returns the message with an error state set if it detects that the
229 received message might have been truncated. Check is done here as the calculation
230 is somewhat based on header version.
232 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
233 uta_mhdr_t* hdr = NULL; // current header
234 uta_v1mhdr_t* v1hdr; // version 1 header
236 int hlen; // header len to use for a truncation check
238 msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
240 v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
242 if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
244 v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
246 ver = ntohl( v1hdr->rmr_ver );
251 msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
252 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
253 msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
255 msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
256 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
257 msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
258 msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
260 hlen = sizeof( uta_v1mhdr_t );
263 default: // current version always lands here
264 hdr = (uta_mhdr_t *) msg->header;
265 msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
266 msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
268 msg->payload = PAYLOAD_ADDR( hdr ); // at user payload
269 msg->xaction = &hdr->xid[0]; // point at transaction id in header area
270 msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
271 msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
272 msg->sub_id = ntohl( hdr->sub_id );
273 hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
277 if( msg->len > (msg->alloc_len - hlen ) ) {
278 msg->state = RMR_ERR_TRUNC;
279 msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
286 This will clone a message into a new zero copy buffer and return the cloned message.
288 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) {
289 rmr_mbuf_t* nm; // new message buffer
295 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
297 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
300 memset( nm, 0, sizeof( *nm ) );
302 mlen = old_msg->alloc_len; // length allocated before
303 if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
304 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
308 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
309 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
310 switch( ntohl( v1hdr->rmr_ver ) ) {
312 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
313 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
316 default: // current message always caught here
318 memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
319 nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
323 // --- these are all version agnostic -----------------------------------
324 nm->mtype = old_msg->mtype;
325 nm->sub_id = old_msg->sub_id;
326 nm->len = old_msg->len; // length of data in the payload
327 nm->alloc_len = mlen; // length of allocated payload
329 nm->xaction = hdr->xid; // reference xaction
330 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
331 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
332 memcpy( nm->payload, old_msg->payload, old_msg->len );
338 This will clone a message with a change to the trace area in the header such that
339 it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
340 The orignal message will be left unchanged, and a pointer to the new message is returned.
341 It is not possible to realloc buffers and change the data sizes.
343 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len ) {
344 rmr_mbuf_t* nm; // new message buffer
349 int tr_old_len; // tr size in new buffer
350 int* alen; // convenience pointer to set toal xmit len FIX ME!
351 int tpb_len; // total transmit buffer len (user space, rmr header and tp header)
353 nm = (rmr_mbuf_t *) malloc( sizeof *nm );
355 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
358 memset( nm, 0, sizeof( *nm ) );
360 hdr = old_msg->header;
361 tr_old_len = RMR_TR_LEN( hdr ); // bytes in old header for trace
363 mlen = old_msg->alloc_len + (tr_len - tr_old_len); // new length with trace adjustment
364 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
366 tpb_len = mlen + TP_HDR_LEN;
367 if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
368 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
371 memset( nm->tp_buf, 0, tpb_len );
372 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING
373 alen = (int *) nm->tp_buf;
374 *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len
376 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
378 v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
379 switch( ntohl( v1hdr->rmr_ver ) ) {
381 memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
382 nm->payload = (void *) v1hdr + sizeof( *v1hdr );
385 default: // current message version always caught here
387 memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
388 SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
390 if( RMR_D1_LEN( hdr ) ) {
391 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
393 if( RMR_D2_LEN( hdr ) ) {
394 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
397 nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
401 // --- these are all version agnostic -----------------------------------
402 nm->mtype = old_msg->mtype;
403 nm->sub_id = old_msg->sub_id;
404 nm->len = old_msg->len; // length of data in the payload
405 nm->alloc_len = mlen; // length of allocated payload
407 nm->xaction = hdr->xid; // reference xaction
408 nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
409 nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
410 memcpy( nm->payload, old_msg->payload, old_msg->len );
416 Realloc the message such that the payload is at least payload_len bytes.
417 The clone and copy options affect what portion of the original payload is copied to
418 the reallocated message, and whether or not the original payload is lost after the
419 reallocation process has finished.
422 The entire payload from the original message will be coppied to the reallocated
426 Only the header (preserving return to sender information, message type, etc)
427 is preserved after reallocation; the payload used lengrh is set to 0 and the
428 payload is NOT initialised/cleared.
431 The orignal message is preserved and a completely new message buffer and payload
432 are allocated (even if the size given is the same). A pointer to the new message
433 buffer is returned and it is the user application's responsibility to manage the
434 old buffer (e.g. free when not needed).
437 The old payload will be lost after reallocation. The message buffer pointer which
438 is returned will likely reference the same structure (don't depend on that).
442 If the message is not a message which was received, the mtype, sub-id, length values in the
443 RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
444 mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
445 resetting information in the mbuffer, this funciton will reset the mbuf values from the current
446 settings and NOT from the copied RMR header in transport buffer.
448 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
449 rmr_mbuf_t* nm = NULL; // new message buffer when cloning
451 uta_mhdr_t* omhdr; // old message header
452 int tr_old_len; // tr size in new buffer
453 int old_psize = 0; // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
454 int hdr_len = 0; // length of RMR and transport headers in old msg
455 void* old_tp_buf; // pointer to the old tp buffer
456 int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
457 int old_mt; // msg type and sub-id from the message passed in
460 int old_rfd; // rts file descriptor from old message
462 if( old_msg == NULL || payload_len <= 0 ) {
467 old_mt = old_msg->mtype; // preserve mbuf info
468 old_sid = old_msg->sub_id;
469 old_len = old_msg->len;
470 old_rfd = old_msg->rts_fd;
472 old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN); // user payload size in orig message
474 if( !clone && payload_len <= old_psize ) { // not cloning and old is large enough; nothing to do
475 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
479 hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN; // with SI we manage the transport header; must include in len
480 old_tp_buf = old_msg->tp_buf;
483 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
486 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
488 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
491 memset( nm, 0, sizeof( *nm ) );
492 nm->rts_fd = old_rfd; // this is managed only in the mbuf; dup now
497 omhdr = old_msg->header;
498 mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
500 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
501 if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
502 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
506 nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN; // point at the new header and copy from old
507 SET_HDR_LEN( nm->header );
509 if( copy ) { // if we need to copy the old payload too
510 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
511 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
512 } else { // just need to copy header
513 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
514 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
517 ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
520 nm->mtype = -1; // didn't copy payload, so mtype, sub-id, and rts fd are invalid
522 nm->len = 0; // and len is 0
524 nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
526 nm->sub_id = old_sid;
530 free( old_tp_buf ); // we did not clone, so free b/c no references
537 For SI95 based transport all receives are driven through the threaded
538 ring and thus this function should NOT be called. If it is we will panic
539 and abort straight away.
541 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
543 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort! rcv_msg called and it shouldn't be\n" );
550 Receives a 'raw' message from a non-RMr sender (no header expected). The returned
551 message buffer cannot be used to send, and the length information may or may
552 not be correct (it is set to the length received which might be more than the
553 bytes actually in the payload).
555 Mostly this supports the route table collector, but could be extended with an
556 API external function.
558 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
561 FIXME: do we need this in the SI world? The only user was the route table collector
563 rmr_mbuf_t* msg = NULL; // msg received
564 size_t rsize; // nng needs to write back the size received... grrr
569 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
572 //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
573 if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
576 rsize = nng_msg_len( msg->tp_buf );
578 // do NOT use ref_tpbuf() here! Must fill these in manually.
579 msg->header = nng_msg_body( msg->tp_buf );
580 msg->len = rsize; // len is the number of bytes received
581 msg->alloc_len = rsize;
582 msg->mtype = UNSET_MSGTYPE; // raw message has no type
583 msg->sub_id = UNSET_SUBID; // nor a subscription id
585 msg->flags = MFL_RAW;
586 msg->payload = msg->header; // payload is the whole thing; no header
589 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
596 This does the hard work of actually sending the message to the given socket. On success,
597 a new message struct is returned. On error, the original msg is returned with the state
598 set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
599 buffer will not be allocated and returned (mostly for call() interal processing since
600 the return message from call() is a received buffer, not a new one).
602 Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
603 validation has been done prior.
605 When msg->state is not ok, this function must set tp_state in the message as some API
606 fucntions return the message directly and do not propigate errno into the message.
608 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
611 int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
612 int tr_len; // trace len in sending message so we alloc new message with same trace sizes
613 int tot_len; // total send length (hdr + user data + tp header)
615 // future: ensure that application did not overrun the XID buffer; last byte must be 0
617 hdr = (uta_mhdr_t *) msg->header;
618 hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
619 hdr->sub_id = htonl( msg->sub_id );
620 hdr->plen = htonl( msg->len );
621 tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
623 if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
624 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
625 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
636 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
637 *((int*) msg->tp_buf) = tot_len;
639 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
640 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
642 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
643 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: error!! sent state=%d\n", state );
645 if( retries > 0 && state == SI_ERR_BLOCKED ) {
646 if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
648 if( retries > 0 ) { // only if we'll loop through again
649 usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
654 state = 0; // don't loop
657 if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
662 } while( state && retries > 0 );
664 if( msg->state == RMR_OK ) { // successful send
665 if( !(msg->flags & MFL_NOALLOC) ) { // allocate another sendable zc buffer unless told otherwise
666 return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len ); // preallocate a zero-copy buffer and return msg
668 rmr_free_msg( msg ); // not wanting a meessage back, trash this one
671 } else { // send failed -- return original message
672 if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us
674 msg->state = RMR_ERR_RETRY; // errno will have nano reason
676 msg->state = RMR_ERR_SENDFAILED;
679 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
686 send message with maximum timeout.
687 Accept a message and send it to an endpoint based on message type.
688 If NNG reports that the send attempt timed out, or should be retried,
689 RMr will retry for approximately max_to microseconds; rounded to the next
692 Allocates a new message buffer for the next send. If a message type has
693 more than one group of endpoints defined, then the message will be sent
694 in round robin fashion to one endpoint in each group.
696 An endpoint will be looked up in the route table using the message type and
697 the subscription id. If the subscription id is "UNSET_SUBID", then only the
698 message type is used. If the initial lookup, with a subid, fails, then a
699 second lookup using just the mtype is tried.
701 When msg->state is not OK, this function must set tp_state in the message as
702 some API fucntions return the message directly and do not propigate errno into
705 CAUTION: this is a non-blocking send. If the message cannot be sent, then
706 it will return with an error and errno set to eagain. If the send is
707 a limited fanout, then the returned status is the status of the last
711 static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
712 endpoint_t* ep; // end point that we're attempting to send to
713 rtable_ent_t* rte; // the route table entry which matches the message key
714 int nn_sock; // endpoint socket (fd in si case) for send
716 int group; // selected group to get socket for
717 int send_again; // true if the message must be sent again
718 rmr_mbuf_t* clone_m; // cloned message for an nth send
719 int sock_ok; // got a valid socket from round robin select
721 int ok_sends = 0; // track number of ok sends
723 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
724 errno = EINVAL; // if msg is null, this is their clue
726 msg->state = RMR_ERR_BADARG;
727 errno = EINVAL; // must ensure it's not eagain
728 msg->tp_state = errno;
733 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
734 if( msg->header == NULL ) {
735 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
736 msg->state = RMR_ERR_NOHDR;
737 errno = EBADMSG; // must ensure it's not eagain
738 msg->tp_state = errno;
743 max_to = ctx->send_retries; // convert to retries
746 if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) { // find the entry which matches subid/type allow fallback to type only key
747 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
748 msg->state = RMR_ERR_NOENDPT;
749 errno = ENXIO; // must ensure it's not eagain
750 msg->tp_state = errno;
751 return msg; // caller can resend (maybe) or free
754 send_again = 1; // force loop entry
755 group = 0; // always start with group 0
756 while( send_again ) {
757 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx ); // select endpt from rr group and set again if more groups
759 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
760 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
764 if( sock_ok ) { // with an rte we _should_ always have a socket, but don't bet on it
766 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
767 if( clone_m == NULL ) {
768 msg->state = RMR_ERR_SENDFAILED;
770 msg->tp_state = errno;
771 rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
775 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
776 msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
777 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
779 if( msg != NULL ) { // returned message indicates send error of some sort
780 rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
784 msg = clone_m; // clone will be the next to send
787 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
790 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
795 if( ep != NULL && msg != NULL ) {
796 switch( msg->state ) {
798 ep->scounts[EPSC_GOOD]++;
802 ep->scounts[EPSC_TRANS]++;
806 ep->scounts[EPSC_FAIL]++;
807 uta_ep_failed( ep ); // sending to ep failed; set up to reconnect
812 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
813 msg->state = RMR_ERR_NOENDPT;
818 if( msg ) { // call functions don't get a buffer back, so a nil check is required
819 msg->flags &= ~MFL_NOALLOC; // must return with this flag off
820 if( ok_sends ) { // multiple rr-groups and one was successful; report ok
824 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
826 msg->tp_state = errno;
829 return msg; // last message caries the status of last/only send attempt
834 A generic wrapper to the real send to keep wormhole stuff agnostic.
835 We assume the wormhole function vetted the buffer so we don't have to.
837 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
838 return send_msg( ctx, msg, ep->nn_sock, -1 );