3592a7b1957c8f10816eb0f90cac0977e9aa6eb4
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia 
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly) 
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45 */
46 static inline int xlate_nng_state( int state, int def_state ) {
47
48         switch( state ) {
49                 case 0:
50                         errno = 0;
51                         state = RMR_OK;
52                         break;
53
54                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
55                         state = RMR_ERR_RETRY;
56                         errno = EAGAIN;
57                         break;
58                         
59                 case NNG_ETIMEDOUT:
60                         state = RMR_ERR_RETRY;
61                         errno = EAGAIN;
62                         break;
63
64                 case NNG_ENOTSUP:
65                         errno  = ENOTSUP;
66                         state = def_state;
67                         break;
68
69                 case NNG_EINVAL:
70                         errno  = EINVAL;
71                         state = def_state;
72                         break;
73
74                 case NNG_ENOMEM:
75                         errno  = ENOMEM;
76                         state = def_state;
77                         break;
78
79                 case NNG_ESTATE:
80                         errno  = EBADFD;                                // file des not in a good state for the operation
81                         state = def_state;
82                         break;
83
84                 case NNG_ECLOSED:
85                         errno  = EBADFD;                                // file des not in a good state for the operation
86                         state = def_state;
87                         break;
88                 
89                 default:
90                         errno = EBADE;
91                         state = def_state;
92                         break;
93         }
94
95         return state;
96 }
97
98 /*
99         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100         a new message struct as well. Size is the size of the zc buffer to allocate (not
101         including our header). If size is 0, then the buffer allocated is the size previously
102         allocated (if msg is !nil) or the default size given at initialisation).
103
104         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105         the context value is used.
106
107         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
108                 are zero copy, however ONLY the message is zero copy. We now allocate and use
109                 nng messages.
110 */
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112         size_t          mlen;                   // size of the transport buffer that we'll allocate
113         uta_mhdr_t*     hdr;                    // convenience pointer
114         int                     tr_len;                 // trace data len (default or override)
115
116         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
117
118         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
119         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
120
121         if( msg == NULL ) {
122                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
123                 if( msg == NULL ) {
124                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
125                         exit( 1 );
126                 }
127         } else {
128                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
129         }
130
131         memset( msg, 0, sizeof( *msg ) );
132
133         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135                 abort( );                                                                                       // toss out a core file for this
136         }
137
138         msg->header = nng_msg_body( msg->tp_buf );
139         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
140         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
142                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
143                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
144                 //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
145                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );
146         }
147         msg->len = 0;                                                                                   // length of data in the payload
148         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
149         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
150         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
151         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
152         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
153         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
154
155         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
156
157         return msg;
158 }
159
160 /*
161         Allocates only the mbuf and does NOT allocate an underlying transport buffer since 
162         NNG receive must allocate that on its own.
163 */
164 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
165         size_t  mlen;
166         uta_mhdr_t* hdr;                        // convenience pointer
167         rmr_mbuf_t* msg;
168
169         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
170         if( msg == NULL ) {
171                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
172                 exit( 1 );
173         }
174
175         memset( msg, 0, sizeof( *msg ) );
176
177         msg->tp_buf = NULL;
178         msg->header = NULL;
179         msg->len = -1;                                                                                  // no payload; invalid len
180         msg->alloc_len = -1;
181         msg->payload = NULL;
182         msg->xaction = NULL;
183         msg->state = RMR_ERR_UNSET;
184         msg->flags = 0;
185
186         return msg;
187 }
188
189 /*
190         This accepts a message with the assumption that only the tp_buf pointer is valid. It
191         sets all of the various header/payload/xaction pointers in the mbuf to the proper
192         spot in the transport layer buffer.  The len in the header is assumed to be the 
193         allocated len (a receive buffer that nng created);
194
195         The alen parm is the assumed allocated length; assumed because it's a value likely
196         to have come from nng receive and the actual alloc len might be larger, but we
197         can only assume this is the total usable space.
198
199         This function returns the message with an error state set if it detects that the
200         received message might have been truncated.  Check is done here as the calculation
201         is somewhat based on header version.
202 */
203 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
204         uta_mhdr_t* hdr;                                // current header
205         uta_v1mhdr_t* v1hdr;                    // version 1 header
206         int ver;
207         int     hlen;                                           // header len to use for a truncation check
208
209         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
210         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
211
212         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order 
213                 ver = 1;
214                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
215         } else {
216                 ver = ntohl( v1hdr->rmr_ver );
217         }
218
219         switch( ver ) {
220                 case 1:
221                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
222                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
223                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
224
225                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
226                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
227                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
228                         msg->state = RMR_OK;
229                         hlen = sizeof( uta_v1mhdr_t );
230                         break;
231
232                 default:                                                                                                        // current version always lands here
233                         hdr = (uta_mhdr_t *) msg->header;
234                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
235                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
236
237                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
238                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
239                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
240                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
241                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
242                         break;
243         }
244
245         if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
246                 msg->state = RMR_ERR_TRUNC;
247                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
248         } else {
249                 msg->state = RMR_OK;
250         }
251 }
252
253 /*
254         This will clone a message into a new zero copy buffer and return the cloned message.
255 */
256 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
257         rmr_mbuf_t* nm;                 // new message buffer
258         size_t  mlen;
259         int state;
260         uta_mhdr_t* hdr;
261         uta_v1mhdr_t* v1hdr;
262
263         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
264         if( nm == NULL ) {
265                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
266                 exit( 1 );
267         }
268         memset( nm, 0, sizeof( *nm ) );
269
270         mlen = old_msg->alloc_len;                                                                              // length allocated before
271         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
272                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
273                 exit( 1 );
274         }
275
276         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
277         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
278         switch( ntohl( v1hdr->rmr_ver ) ) {
279                 case 1:
280                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
281                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
282                         break;
283
284                 default:                                                                                        // current message always caught  here
285                         hdr = nm->header;
286                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
287                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
288                         break;
289         }
290                 
291         // --- these are all version agnostic -----------------------------------
292         nm->mtype = old_msg->mtype;
293         nm->len = old_msg->len;                                                                 // length of data in the payload
294         nm->alloc_len = mlen;                                                                   // length of allocated payload
295
296         nm->xaction = hdr->xid;                                                                 // reference xaction
297         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
298         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
299         memcpy( nm->payload, old_msg->payload, old_msg->len );
300
301         return nm;
302 }
303
304 /*
305         This will clone a message with a change to the trace area in the header such that
306         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
307         The orignal message will be left unchanged, and a pointer to the new message is returned.
308         It is not possible to realloc buffers and change the data sizes.
309 */
310 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
311         rmr_mbuf_t* nm;                 // new message buffer
312         size_t  mlen;
313         int state;
314         uta_mhdr_t* hdr;
315         uta_v1mhdr_t* v1hdr;
316         int     tr_old_len;                     // tr size in new buffer
317         int     coffset;                        // an offset to something in the header for copy
318
319         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
320         if( nm == NULL ) {
321                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
322                 exit( 1 );
323         }
324         memset( nm, 0, sizeof( *nm ) );
325
326         hdr = old_msg->header;
327         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
328
329         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
330         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
331         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
332                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
333                 exit( 1 );
334         }
335
336         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
337         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
338         switch( ntohl( v1hdr->rmr_ver ) ) {
339                 case 1:
340                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
341                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
342                         break;
343
344                 default:                                                                                        // current message always caught  here
345                         hdr = nm->header;
346                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
347                         if( RMR_D1_LEN( hdr )  ) {
348                                 coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
349                                 memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
350                         
351                         }
352                         if( RMR_D2_LEN( hdr )  ) {
353                                 coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
354                                 memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
355                         }
356
357                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
358                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
359                         SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
360                         break;
361         }
362                 
363         // --- these are all version agnostic -----------------------------------
364         nm->mtype = old_msg->mtype;
365         nm->len = old_msg->len;                                                                 // length of data in the payload
366         nm->alloc_len = mlen;                                                                   // length of allocated payload
367
368         nm->xaction = hdr->xid;                                                                 // reference xaction
369         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
370         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
371         memcpy( nm->payload, old_msg->payload, old_msg->len );
372
373         return nm;
374 }
375
376 /*
377         This is the receive work horse used by the outer layer receive functions.
378         It waits for a message to be received on our listen socket. If old msg
379         is passed in, the we assume we can use it instead of allocating a new
380         one, else a new block of memory is allocated.
381
382         This allocates a zero copy message so that if the user wishes to call
383         rmr_rts_msg() the send is zero copy.
384
385         The nng timeout on send is at the ms level which is a tad too long for
386         our needs.  So, if NNG returns eagain or timedout (we don't set one)
387         we will loop up to 5 times with a 10 microsecond delay between each
388         attempt.  If at the end of this set of retries NNG is still saying
389         eagain/timeout we'll return to the caller with that set in errno.
390         Right now this is only for zero-copy buffers (they should all be zc
391         buffers now).
392
393
394         In the NNG msg world it must allocate the receive buffer rather
395         than accepting one that we allocated from their space and could 
396         reuse.  They have their reasons I guess.  Thus, we will free
397         the old transport buffer if user passes the message in; at least
398         our mbuf will be reused. 
399 */
400 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
401         int state;
402         rmr_mbuf_t*     msg = NULL;             // msg received
403         uta_mhdr_t* hdr;
404         size_t  rsize;                          // nng needs to write back the size received... grrr
405
406         if( old_msg ) {
407                 msg = old_msg;
408                 if( msg->tp_buf != NULL ) {
409                         nng_msg_free( msg->tp_buf );
410                 }
411
412                 msg->tp_buf = NULL;
413         } else {
414                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
415         }
416
417         msg->alloc_len = 0;
418         msg->len = 0;
419         msg->payload = NULL;
420         msg->xaction = NULL;
421
422         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
423         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
424                 return msg;
425         }
426
427         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
428                 msg->state = RMR_ERR_EMPTY;
429                 return msg;
430         }
431
432         rsize = nng_msg_len( msg->tp_buf );
433         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
434                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
435                 hdr = (uta_mhdr_t *) msg->header;
436                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
437
438                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
439                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
440         } else {
441                 msg->state = RMR_ERR_EMPTY;
442                 msg->len = 0;
443                 msg->alloc_len = rsize;
444                 msg->payload = NULL;
445                 msg->xaction = NULL;
446                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
447                 msg->mtype = -1;
448         }
449
450         return msg;
451 }
452
453 /*
454         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
455         message buffer cannot be used to send, and the length information may or may 
456         not be correct (it is set to the length received which might be more than the
457         bytes actually in the payload).
458
459         Mostly this supports the route table collector, but could be extended with an 
460         API external function.
461 */
462 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
463         int state;
464         rmr_mbuf_t*     msg = NULL;             // msg received
465         size_t  rsize;                          // nng needs to write back the size received... grrr
466
467         if( old_msg ) {
468                 msg = old_msg;
469         } else {
470                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
471         }
472
473         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
474         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
475                 return msg;
476         }
477         rsize = nng_msg_len( msg->tp_buf );
478
479         // do NOT use ref_tpbuf() here! Must fill these in manually.
480         msg->header = nng_msg_body( msg->tp_buf );
481         msg->len = rsize;                                                       // len is the number of bytes received
482         msg->alloc_len = rsize;
483         msg->mtype = -1;                                                        // raw message has no type
484         msg->state = RMR_OK;
485         msg->flags = MFL_RAW;
486         msg->payload = msg->header;                                     // payload is the whole thing; no header
487         msg->xaction = NULL;
488
489         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
490
491         return msg;
492 }
493
494 /*
495         This does the hard work of actually sending the message to the given socket. On success,
496         a new message struct is returned. On error, the original msg is returned with the state 
497         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
498         buffer will not be allocated and returned (mostly for call() interal processing since
499         the return message from call() is a received buffer, not a new one).
500
501         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
502         validation has been done prior.
503 */
504 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
505         int state;
506         uta_mhdr_t*     hdr;
507         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
508         int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
509         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
510
511         // future: ensure that application did not overrun the XID buffer; last byte must be 0
512
513         hdr = (uta_mhdr_t *) msg->header;
514         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
515         hdr->plen = htonl( msg->len );
516         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
517
518         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
519                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
520         }
521
522         errno = 0;
523         msg->state = RMR_OK;
524         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
525                 do {
526                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
527                                 msg->state = state;
528                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
529                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
530                                                 retries--;
531                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
532                                                 spin_retries = 1000;
533                                         }
534                                 } else {
535                                         state = 0;                      // don't loop
536                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
537                                 }
538                         } else {
539                                 state = 0;
540                                 msg->state = RMR_OK;
541                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
542                                 msg->tp_buf = NULL;
543                                 hdr = NULL;
544                         }
545                 } while( state && retries > 0 );
546         } else {
547                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
548                 msg->state = RMR_ERR_SENDFAILED;
549                 errno = ENOTSUP;
550                 return msg;
551                 /*
552                 NOT SUPPORTED
553                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
554                         msg->state = state;
555                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
556                 } 
557                 */
558         }
559
560         if( msg->state == RMR_OK ) {                                                                    // successful send
561                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
562                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
563                 } else {
564                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
565                         return NULL;
566                 }
567         } else {                                                                                        // send failed -- return original message
568                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
569                         errno = EAGAIN;
570                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
571                 } else {
572                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
573                 }
574
575                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
576         }
577
578         return msg;
579 }
580
581 /*
582         A generic wrapper to the real send to keep wormhole stuff agnostic.
583         We assume the wormhole function vetted the buffer so we don't have to.
584 */
585 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
586         return send_msg( ctx, msg, ep->nn_sock, -1 );
587 }
588
589 #endif