c7bd04942885a44ed49b22336a1f36ab5eff8d04
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly) 
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45 */
46 static inline int xlate_nng_state( int state, int def_state ) {
47
48         switch( state ) {
49                 case 0:
50                         errno = 0;
51                         state = RMR_OK;
52                         break;
53
54                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
55                         state = RMR_ERR_RETRY;
56                         errno = EAGAIN;
57                         break;
58                         
59                 case NNG_ETIMEDOUT:
60                         state = RMR_ERR_RETRY;
61                         errno = EAGAIN;
62                         break;
63
64                 case NNG_ENOTSUP:
65                         errno  = ENOTSUP;
66                         state = def_state;
67                         break;
68
69                 case NNG_EINVAL:
70                         errno  = EINVAL;
71                         state = def_state;
72                         break;
73
74                 case NNG_ENOMEM:
75                         errno  = ENOMEM;
76                         state = def_state;
77                         break;
78
79                 case NNG_ESTATE:
80                         errno  = EBADFD;                                // file des not in a good state for the operation
81                         state = def_state;
82                         break;
83
84                 case NNG_ECLOSED:
85                         errno  = EBADFD;                                // file des not in a good state for the operation
86                         state = def_state;
87                         break;
88                 
89                 default:
90                         errno = EBADE;
91                         state = def_state;
92                         break;
93         }
94
95         return state;
96 }
97
98 /*
99         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100         a new message struct as well. Size is the size of the zc buffer to allocate (not
101         including our header). If size is 0, then the buffer allocated is the size previously
102         allocated (if msg is !nil) or the default size given at initialisation).
103
104         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105         the context value is used.
106
107         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
108                 are zero copy, however ONLY the message is zero copy. We now allocate and use
109                 nng messages.
110 */
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112         size_t          mlen;                   // size of the transport buffer that we'll allocate
113         uta_mhdr_t*     hdr;                    // convenience pointer
114         int                     tr_len;                 // trace data len (default or override)
115
116         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
117
118         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
119         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
120
121         if( msg == NULL ) {
122                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
123                 if( msg == NULL ) {
124                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
125                         exit( 1 );
126                 }
127         } else {
128                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
129         }
130
131         memset( msg, 0, sizeof( *msg ) );
132
133         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135                 abort( );                                                                                       // toss out a core file for this
136         }
137
138         msg->header = nng_msg_body( msg->tp_buf );
139         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
140         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
142                 hdr->sub_id = htonl( UNSET_SUBID );
143                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
144                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
145                 //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
146                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );
147         }
148         msg->len = 0;                                                                                   // length of data in the payload
149         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
150         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
151         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
152         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
153         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
154         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
155
156         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
157
158         return msg;
159 }
160
161 /*
162         Allocates only the mbuf and does NOT allocate an underlying transport buffer since 
163         NNG receive must allocate that on its own.
164 */
165 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
166         size_t  mlen;
167         uta_mhdr_t* hdr;                        // convenience pointer
168         rmr_mbuf_t* msg;
169
170         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
171         if( msg == NULL ) {
172                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
173                 exit( 1 );
174         }
175
176         memset( msg, 0, sizeof( *msg ) );
177
178         msg->tp_buf = NULL;
179         msg->header = NULL;
180         msg->len = -1;                                                                                  // no payload; invalid len
181         msg->alloc_len = -1;
182         msg->payload = NULL;
183         msg->xaction = NULL;
184         msg->state = RMR_ERR_UNSET;
185         msg->flags = 0;
186
187         return msg;
188 }
189
190 /*
191         This accepts a message with the assumption that only the tp_buf pointer is valid. It
192         sets all of the various header/payload/xaction pointers in the mbuf to the proper
193         spot in the transport layer buffer.  The len in the header is assumed to be the 
194         allocated len (a receive buffer that nng created);
195
196         The alen parm is the assumed allocated length; assumed because it's a value likely
197         to have come from nng receive and the actual alloc len might be larger, but we
198         can only assume this is the total usable space.
199
200         This function returns the message with an error state set if it detects that the
201         received message might have been truncated.  Check is done here as the calculation
202         is somewhat based on header version.
203 */
204 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
205         uta_mhdr_t* hdr;                                // current header
206         uta_v1mhdr_t* v1hdr;                    // version 1 header
207         int ver;
208         int     hlen;                                           // header len to use for a truncation check
209
210         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
211         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
212
213         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order 
214                 ver = 1;
215                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
216         } else {
217                 ver = ntohl( v1hdr->rmr_ver );
218         }
219
220         switch( ver ) {
221                 case 1:
222                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
223                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
224                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
225
226                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
227                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
228                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
229                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
230                         msg->state = RMR_OK;
231                         hlen = sizeof( uta_v1mhdr_t );
232                         break;
233
234                 default:                                                                                                        // current version always lands here
235                         hdr = (uta_mhdr_t *) msg->header;
236                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
237                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
238
239                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
240                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
241                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
242                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
243                         msg->sub_id = ntohl( hdr->sub_id );
244                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
245                         break;
246         }
247
248         if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
249                 msg->state = RMR_ERR_TRUNC;
250                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
251         } else {
252                 msg->state = RMR_OK;
253         }
254 }
255
256 /*
257         This will clone a message into a new zero copy buffer and return the cloned message.
258 */
259 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
260         rmr_mbuf_t* nm;                 // new message buffer
261         size_t  mlen;
262         int state;
263         uta_mhdr_t* hdr;
264         uta_v1mhdr_t* v1hdr;
265
266         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
267         if( nm == NULL ) {
268                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
269                 exit( 1 );
270         }
271         memset( nm, 0, sizeof( *nm ) );
272
273         mlen = old_msg->alloc_len;                                                                              // length allocated before
274         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
275                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
276                 exit( 1 );
277         }
278
279         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
280         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
281         switch( ntohl( v1hdr->rmr_ver ) ) {
282                 case 1:
283                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
284                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
285                         break;
286
287                 default:                                                                                        // current message always caught  here
288                         hdr = nm->header;
289                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
290                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
291                         break;
292         }
293                 
294         // --- these are all version agnostic -----------------------------------
295         nm->mtype = old_msg->mtype;
296         nm->sub_id = old_msg->sub_id;
297         nm->len = old_msg->len;                                                                 // length of data in the payload
298         nm->alloc_len = mlen;                                                                   // length of allocated payload
299
300         nm->xaction = hdr->xid;                                                                 // reference xaction
301         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
302         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
303         memcpy( nm->payload, old_msg->payload, old_msg->len );
304
305         return nm;
306 }
307
308 /*
309         This will clone a message with a change to the trace area in the header such that
310         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
311         The orignal message will be left unchanged, and a pointer to the new message is returned.
312         It is not possible to realloc buffers and change the data sizes.
313 */
314 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
315         rmr_mbuf_t* nm;                 // new message buffer
316         size_t  mlen;
317         int state;
318         uta_mhdr_t* hdr;
319         uta_v1mhdr_t* v1hdr;
320         int     tr_old_len;                     // tr size in new buffer
321         int     coffset;                        // an offset to something in the header for copy
322
323         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
324         if( nm == NULL ) {
325                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
326                 exit( 1 );
327         }
328         memset( nm, 0, sizeof( *nm ) );
329
330         hdr = old_msg->header;
331         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
332
333         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
334         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
335         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
336                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
337                 exit( 1 );
338         }
339
340         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
341         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
342         switch( ntohl( v1hdr->rmr_ver ) ) {
343                 case 1:
344                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
345                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
346                         break;
347
348                 default:                                                                                        // current message always caught  here
349                         hdr = nm->header;
350                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
351                         if( RMR_D1_LEN( hdr )  ) {
352                                 coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
353                                 memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
354                         
355                         }
356                         if( RMR_D2_LEN( hdr )  ) {
357                                 coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
358                                 memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
359                         }
360
361                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
362                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
363                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
364                         break;
365         }
366                 
367         // --- these are all version agnostic -----------------------------------
368         nm->mtype = old_msg->mtype;
369         nm->sub_id = old_msg->sub_id;
370         nm->len = old_msg->len;                                                                 // length of data in the payload
371         nm->alloc_len = mlen;                                                                   // length of allocated payload
372
373         nm->xaction = hdr->xid;                                                                 // reference xaction
374         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
375         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
376         memcpy( nm->payload, old_msg->payload, old_msg->len );
377
378         return nm;
379 }
380
381 /*
382         This is the receive work horse used by the outer layer receive functions.
383         It waits for a message to be received on our listen socket. If old msg
384         is passed in, the we assume we can use it instead of allocating a new
385         one, else a new block of memory is allocated.
386
387         This allocates a zero copy message so that if the user wishes to call
388         rmr_rts_msg() the send is zero copy.
389
390         The nng timeout on send is at the ms level which is a tad too long for
391         our needs.  So, if NNG returns eagain or timedout (we don't set one)
392         we will loop up to 5 times with a 10 microsecond delay between each
393         attempt.  If at the end of this set of retries NNG is still saying
394         eagain/timeout we'll return to the caller with that set in errno.
395         Right now this is only for zero-copy buffers (they should all be zc
396         buffers now).
397
398
399         In the NNG msg world it must allocate the receive buffer rather
400         than accepting one that we allocated from their space and could 
401         reuse.  They have their reasons I guess.  Thus, we will free
402         the old transport buffer if user passes the message in; at least
403         our mbuf will be reused. 
404 */
405 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
406         int state;
407         rmr_mbuf_t*     msg = NULL;             // msg received
408         uta_mhdr_t* hdr;
409         size_t  rsize;                          // nng needs to write back the size received... grrr
410
411         if( old_msg ) {
412                 msg = old_msg;
413                 if( msg->tp_buf != NULL ) {
414                         nng_msg_free( msg->tp_buf );
415                 }
416
417                 msg->tp_buf = NULL;
418         } else {
419                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
420         }
421
422         msg->alloc_len = 0;
423         msg->len = 0;
424         msg->payload = NULL;
425         msg->xaction = NULL;
426
427         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
428         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
429                 return msg;
430         }
431
432         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
433                 msg->state = RMR_ERR_EMPTY;
434                 return msg;
435         }
436
437         rsize = nng_msg_len( msg->tp_buf );
438         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
439                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
440                 hdr = (uta_mhdr_t *) msg->header;
441                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
442
443                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
444                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
445         } else {
446                 msg->state = RMR_ERR_EMPTY;
447                 msg->len = 0;
448                 msg->alloc_len = rsize;
449                 msg->payload = NULL;
450                 msg->xaction = NULL;
451                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
452                 msg->mtype = UNSET_MSGTYPE;
453                 msg->sub_id = UNSET_SUBID;
454         }
455
456         return msg;
457 }
458
459 /*
460         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
461         message buffer cannot be used to send, and the length information may or may 
462         not be correct (it is set to the length received which might be more than the
463         bytes actually in the payload).
464
465         Mostly this supports the route table collector, but could be extended with an 
466         API external function.
467 */
468 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
469         int state;
470         rmr_mbuf_t*     msg = NULL;             // msg received
471         size_t  rsize;                          // nng needs to write back the size received... grrr
472
473         if( old_msg ) {
474                 msg = old_msg;
475         } else {
476                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
477         }
478
479         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
480         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
481                 return msg;
482         }
483         rsize = nng_msg_len( msg->tp_buf );
484
485         // do NOT use ref_tpbuf() here! Must fill these in manually.
486         msg->header = nng_msg_body( msg->tp_buf );
487         msg->len = rsize;                                                       // len is the number of bytes received
488         msg->alloc_len = rsize;
489         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
490         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
491         msg->state = RMR_OK;
492         msg->flags = MFL_RAW;
493         msg->payload = msg->header;                                     // payload is the whole thing; no header
494         msg->xaction = NULL;
495
496         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
497
498         return msg;
499 }
500
501 /*
502         This does the hard work of actually sending the message to the given socket. On success,
503         a new message struct is returned. On error, the original msg is returned with the state 
504         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
505         buffer will not be allocated and returned (mostly for call() interal processing since
506         the return message from call() is a received buffer, not a new one).
507
508         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
509         validation has been done prior.
510 */
511 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
512         int state;
513         uta_mhdr_t*     hdr;
514         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
515         int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
516         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
517
518         // future: ensure that application did not overrun the XID buffer; last byte must be 0
519
520         hdr = (uta_mhdr_t *) msg->header;
521         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
522         hdr->sub_id = htonl( msg->sub_id );
523         hdr->plen = htonl( msg->len );
524         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
525
526         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
527                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
528         }
529
530         errno = 0;
531         msg->state = RMR_OK;
532         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
533                 do {
534                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
535                                 msg->state = state;
536                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
537                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
538                                                 retries--;
539                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
540                                                 spin_retries = 1000;
541                                         }
542                                 } else {
543                                         state = 0;                      // don't loop
544                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
545                                 }
546                         } else {
547                                 state = 0;
548                                 msg->state = RMR_OK;
549                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
550                                 msg->tp_buf = NULL;
551                                 hdr = NULL;
552                         }
553                 } while( state && retries > 0 );
554         } else {
555                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
556                 msg->state = RMR_ERR_SENDFAILED;
557                 errno = ENOTSUP;
558                 return msg;
559                 /*
560                 NOT SUPPORTED
561                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
562                         msg->state = state;
563                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
564                 } 
565                 */
566         }
567
568         if( msg->state == RMR_OK ) {                                                                    // successful send
569                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
570                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
571                 } else {
572                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
573                         return NULL;
574                 }
575         } else {                                                                                        // send failed -- return original message
576                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
577                         errno = EAGAIN;
578                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
579                 } else {
580                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
581                 }
582
583                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
584         }
585
586         return msg;
587 }
588
589 /*
590         A generic wrapper to the real send to keep wormhole stuff agnostic.
591         We assume the wormhole function vetted the buffer so we don't have to.
592 */
593 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
594         return send_msg( ctx, msg, ep->nn_sock, -1 );
595 }
596
597 #endif