da4dfbd45ae5fbc645a0e97c771e47908fd72b7c
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45 */
46 static inline int xlate_nng_state( int state, int def_state ) {
47
48         switch( state ) {
49                 case 0:
50                         errno = 0;
51                         state = RMR_OK;
52                         break;
53
54                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
55                         state = RMR_ERR_RETRY;
56                         errno = EAGAIN;
57                         break;
58
59                 case NNG_ETIMEDOUT:
60                         state = RMR_ERR_RETRY;
61                         errno = EAGAIN;
62                         break;
63
64                 case NNG_ENOTSUP:
65                         errno  = ENOTSUP;
66                         state = def_state;
67                         break;
68
69                 case NNG_EINVAL:
70                         errno  = EINVAL;
71                         state = def_state;
72                         break;
73
74                 case NNG_ENOMEM:
75                         errno  = ENOMEM;
76                         state = def_state;
77                         break;
78
79                 case NNG_ESTATE:
80                         errno  = EBADFD;                                // file des not in a good state for the operation
81                         state = def_state;
82                         break;
83
84                 case NNG_ECLOSED:
85                         errno  = EBADFD;                                // file des not in a good state for the operation
86                         state = def_state;
87                         break;
88
89                 default:
90                         errno = EBADE;
91                         state = def_state;
92                         break;
93         }
94
95         return state;
96 }
97
98 /*
99         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100         a new message struct as well. Size is the size of the zc buffer to allocate (not
101         including our header). If size is 0, then the buffer allocated is the size previously
102         allocated (if msg is !nil) or the default size given at initialisation).
103
104         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105         the context value is used.
106
107         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
108                 are zero copy, however ONLY the message is zero copy. We now allocate and use
109                 nng messages.
110 */
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112         size_t          mlen;                   // size of the transport buffer that we'll allocate
113         uta_mhdr_t*     hdr;                    // convenience pointer
114         int                     tr_len;                 // trace data len (default or override)
115
116         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
117
118         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
119         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
120
121         if( msg == NULL ) {
122                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
123                 if( msg == NULL ) {
124                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
125                         exit( 1 );
126                 }
127         } else {
128                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
129         }
130
131         memset( msg, 0, sizeof( *msg ) );
132
133         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135                 abort( );                                                                                       // toss out a core file for this
136         }
137
138         msg->header = nng_msg_body( msg->tp_buf );
139         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
140         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
142                 hdr->sub_id = htonl( UNSET_SUBID );
143                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
144                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
145                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
146                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
147         }
148         msg->len = 0;                                                                                   // length of data in the payload
149         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
150         msg->sub_id = UNSET_SUBID;
151         msg->mtype = UNSET_MSGTYPE;
152         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
153         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
154         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
155         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
156         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
157
158         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
159
160         return msg;
161 }
162
163 /*
164         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
165         NNG receive must allocate that on its own.
166 */
167 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
168         size_t  mlen;
169         uta_mhdr_t* hdr;                        // convenience pointer
170         rmr_mbuf_t* msg;
171
172         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
173         if( msg == NULL ) {
174                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
175                 exit( 1 );
176         }
177
178         memset( msg, 0, sizeof( *msg ) );
179
180         msg->sub_id = UNSET_SUBID;
181         msg->mtype = UNSET_MSGTYPE;
182         msg->tp_buf = NULL;
183         msg->header = NULL;
184         msg->len = -1;                                                                                  // no payload; invalid len
185         msg->alloc_len = -1;
186         msg->payload = NULL;
187         msg->xaction = NULL;
188         msg->state = RMR_ERR_UNSET;
189         msg->flags = 0;
190
191         return msg;
192 }
193
194 /*
195         This accepts a message with the assumption that only the tp_buf pointer is valid. It
196         sets all of the various header/payload/xaction pointers in the mbuf to the proper
197         spot in the transport layer buffer.  The len in the header is assumed to be the
198         allocated len (a receive buffer that nng created);
199
200         The alen parm is the assumed allocated length; assumed because it's a value likely
201         to have come from nng receive and the actual alloc len might be larger, but we
202         can only assume this is the total usable space.
203
204         This function returns the message with an error state set if it detects that the
205         received message might have been truncated.  Check is done here as the calculation
206         is somewhat based on header version.
207 */
208 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
209         uta_mhdr_t* hdr;                                // current header
210         uta_v1mhdr_t* v1hdr;                    // version 1 header
211         int ver;
212         int     hlen;                                           // header len to use for a truncation check
213
214         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
215         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
216
217         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
218                 ver = 1;
219                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
220         } else {
221                 ver = ntohl( v1hdr->rmr_ver );
222         }
223
224         switch( ver ) {
225                 case 1:
226                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
227                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
228                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
229
230                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
231                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
232                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
233                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
234                         msg->state = RMR_OK;
235                         hlen = sizeof( uta_v1mhdr_t );
236                         break;
237
238                 default:                                                                                                        // current version always lands here
239                         hdr = (uta_mhdr_t *) msg->header;
240                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
241                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
242
243                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
244                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
245                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
246                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
247                         msg->sub_id = ntohl( hdr->sub_id );
248                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
249                         break;
250         }
251
252         if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
253                 msg->state = RMR_ERR_TRUNC;
254                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
255         } else {
256                 msg->state = RMR_OK;
257         }
258 }
259
260 /*
261         This will clone a message into a new zero copy buffer and return the cloned message.
262 */
263 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
264         rmr_mbuf_t* nm;                 // new message buffer
265         size_t  mlen;
266         int state;
267         uta_mhdr_t* hdr;
268         uta_v1mhdr_t* v1hdr;
269
270         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
271         if( nm == NULL ) {
272                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
273                 exit( 1 );
274         }
275         memset( nm, 0, sizeof( *nm ) );
276
277         mlen = old_msg->alloc_len;                                                                              // length allocated before
278         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
279                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
280                 exit( 1 );
281         }
282
283         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
284         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
285         switch( ntohl( v1hdr->rmr_ver ) ) {
286                 case 1:
287                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
288                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
289                         break;
290
291                 default:                                                                                        // current message always caught  here
292                         hdr = nm->header;
293                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
294                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
295                         break;
296         }
297
298         // --- these are all version agnostic -----------------------------------
299         nm->mtype = old_msg->mtype;
300         nm->sub_id = old_msg->sub_id;
301         nm->len = old_msg->len;                                                                 // length of data in the payload
302         nm->alloc_len = mlen;                                                                   // length of allocated payload
303
304         nm->xaction = hdr->xid;                                                                 // reference xaction
305         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
306         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
307         memcpy( nm->payload, old_msg->payload, old_msg->len );
308
309         return nm;
310 }
311
312 /*
313         This will clone a message with a change to the trace area in the header such that
314         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
315         The orignal message will be left unchanged, and a pointer to the new message is returned.
316         It is not possible to realloc buffers and change the data sizes.
317 */
318 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
319         rmr_mbuf_t* nm;                 // new message buffer
320         size_t  mlen;
321         int state;
322         uta_mhdr_t* hdr;
323         uta_v1mhdr_t* v1hdr;
324         int     tr_old_len;                     // tr size in new buffer
325
326         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
327         if( nm == NULL ) {
328                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
329                 exit( 1 );
330         }
331         memset( nm, 0, sizeof( *nm ) );
332
333         hdr = old_msg->header;
334         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
335
336         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
337         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
338         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
339                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
340                 exit( 1 );
341         }
342
343         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
344         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
345         switch( ntohl( v1hdr->rmr_ver ) ) {
346                 case 1:
347                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
348                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
349                         break;
350
351                 default:                                                                                        // current message version always caught  here
352                         hdr = nm->header;
353                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
354                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
355
356                         if( RMR_D1_LEN( hdr )  ) {
357                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
358                         }
359                         if( RMR_D2_LEN( hdr )  ) {
360                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
361                         }
362
363                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
364                         break;
365         }
366
367         // --- these are all version agnostic -----------------------------------
368         nm->mtype = old_msg->mtype;
369         nm->sub_id = old_msg->sub_id;
370         nm->len = old_msg->len;                                                                 // length of data in the payload
371         nm->alloc_len = mlen;                                                                   // length of allocated payload
372
373         nm->xaction = hdr->xid;                                                                 // reference xaction
374         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
375         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
376         memcpy( nm->payload, old_msg->payload, old_msg->len );
377
378         return nm;
379 }
380
381 /*
382         This is the receive work horse used by the outer layer receive functions.
383         It waits for a message to be received on our listen socket. If old msg
384         is passed in, the we assume we can use it instead of allocating a new
385         one, else a new block of memory is allocated.
386
387         This allocates a zero copy message so that if the user wishes to call
388         rmr_rts_msg() the send is zero copy.
389
390         The nng timeout on send is at the ms level which is a tad too long for
391         our needs.  So, if NNG returns eagain or timedout (we don't set one)
392         we will loop up to 5 times with a 10 microsecond delay between each
393         attempt.  If at the end of this set of retries NNG is still saying
394         eagain/timeout we'll return to the caller with that set in errno.
395         Right now this is only for zero-copy buffers (they should all be zc
396         buffers now).
397
398
399         In the NNG msg world it must allocate the receive buffer rather
400         than accepting one that we allocated from their space and could
401         reuse.  They have their reasons I guess.  Thus, we will free
402         the old transport buffer if user passes the message in; at least
403         our mbuf will be reused.
404 */
405 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
406         int state;
407         rmr_mbuf_t*     msg = NULL;             // msg received
408         uta_mhdr_t* hdr;
409         size_t  rsize;                          // nng needs to write back the size received... grrr
410
411         if( old_msg ) {
412                 msg = old_msg;
413                 if( msg->tp_buf != NULL ) {
414                         nng_msg_free( msg->tp_buf );
415                 }
416
417                 msg->tp_buf = NULL;
418         } else {
419                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
420         }
421
422         msg->alloc_len = 0;
423         msg->len = 0;
424         msg->payload = NULL;
425         msg->xaction = NULL;
426         msg->tp_buf = NULL;
427
428         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
429         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
430                 return msg;
431         }
432
433         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
434                 msg->state = RMR_ERR_EMPTY;
435                 return msg;
436         }
437
438         rsize = nng_msg_len( msg->tp_buf );
439         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
440                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
441                 hdr = (uta_mhdr_t *) msg->header;
442                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
443
444                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
445                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
446         } else {
447                 msg->state = RMR_ERR_EMPTY;
448                 msg->len = 0;
449                 msg->alloc_len = rsize;
450                 msg->payload = NULL;
451                 msg->xaction = NULL;
452                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
453                 msg->mtype = UNSET_MSGTYPE;
454                 msg->sub_id = UNSET_SUBID;
455         }
456
457         return msg;
458 }
459
460 /*
461         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
462         message buffer cannot be used to send, and the length information may or may
463         not be correct (it is set to the length received which might be more than the
464         bytes actually in the payload).
465
466         Mostly this supports the route table collector, but could be extended with an
467         API external function.
468 */
469 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
470         int state;
471         rmr_mbuf_t*     msg = NULL;             // msg received
472         size_t  rsize;                          // nng needs to write back the size received... grrr
473
474         if( old_msg ) {
475                 msg = old_msg;
476         } else {
477                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
478         }
479
480         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
481         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
482                 return msg;
483         }
484         rsize = nng_msg_len( msg->tp_buf );
485
486         // do NOT use ref_tpbuf() here! Must fill these in manually.
487         msg->header = nng_msg_body( msg->tp_buf );
488         msg->len = rsize;                                                       // len is the number of bytes received
489         msg->alloc_len = rsize;
490         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
491         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
492         msg->state = RMR_OK;
493         msg->flags = MFL_RAW;
494         msg->payload = msg->header;                                     // payload is the whole thing; no header
495         msg->xaction = NULL;
496
497         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
498
499         return msg;
500 }
501
502 /*
503         This does the hard work of actually sending the message to the given socket. On success,
504         a new message struct is returned. On error, the original msg is returned with the state
505         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
506         buffer will not be allocated and returned (mostly for call() interal processing since
507         the return message from call() is a received buffer, not a new one).
508
509         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
510         validation has been done prior.
511 */
512 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
513         int state;
514         uta_mhdr_t*     hdr;
515         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
516         int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
517         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
518
519         // future: ensure that application did not overrun the XID buffer; last byte must be 0
520
521         hdr = (uta_mhdr_t *) msg->header;
522         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
523         hdr->sub_id = htonl( msg->sub_id );
524         hdr->plen = htonl( msg->len );
525         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
526
527         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
528                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
529         }
530
531         errno = 0;
532         msg->state = RMR_OK;
533         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
534                 do {
535                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
536                                 msg->state = state;
537                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
538                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
539                                                 retries--;
540                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
541                                                 spin_retries = 1000;
542                                         }
543                                 } else {
544                                         state = 0;                      // don't loop
545                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
546                                 }
547                         } else {
548                                 state = 0;
549                                 msg->state = RMR_OK;
550                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
551                                 msg->tp_buf = NULL;
552                                 hdr = NULL;
553                         }
554                 } while( state && retries > 0 );
555         } else {
556                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
557                 msg->state = RMR_ERR_SENDFAILED;
558                 errno = ENOTSUP;
559                 return msg;
560                 /*
561                 NOT SUPPORTED
562                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
563                         msg->state = state;
564                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
565                 }
566                 */
567         }
568
569         if( msg->state == RMR_OK ) {                                                                    // successful send
570                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
571                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
572                 } else {
573                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
574                         return NULL;
575                 }
576         } else {                                                                                        // send failed -- return original message
577                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
578                         errno = EAGAIN;
579                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
580                 } else {
581                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
582                 }
583
584                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
585         }
586
587         return msg;
588 }
589
590 /*
591         send message with maximum timeout.
592         Accept a message and send it to an endpoint based on message type.
593         If NNG reports that the send attempt timed out, or should be retried,
594         RMr will retry for approximately max_to microseconds; rounded to the next
595         higher value of 10.
596
597         Allocates a new message buffer for the next send. If a message type has
598         more than one group of endpoints defined, then the message will be sent
599         in round robin fashion to one endpoint in each group.
600
601         An endpoint will be looked up in the route table using the message type and
602         the subscription id. If the subscription id is "UNSET_SUBID", then only the
603         message type is used.  If the initial lookup, with a subid, fails, then a
604         second lookup using just the mtype is tried.
605
606         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
607                 it will return with an error and errno set to eagain. If the send is
608                 a limited fanout, then the returned status is the status of the last
609                 send attempt.
610
611 */
612 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
613         nng_socket      nn_sock;                        // endpoint socket for send
614         uta_ctx_t*      ctx;
615         int                     group;                          // selected group to get socket for
616         int                     send_again;                     // true if the message must be sent again
617         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
618         int                     sock_ok;                        // got a valid socket from round robin select
619         uint64_t         key;                           // mtype or sub-id/mtype sym table key
620         int                     altk_ok = 0;            // set true if we can lookup on alternate key if mt/sid lookup fails
621         char*           d1;
622
623         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
624                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
625                 if( msg != NULL ) {
626                         msg->state = RMR_ERR_BADARG;
627                         errno = EINVAL;                                                                                 // must ensure it's not eagain
628                 }
629                 return msg;
630         }
631
632         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
633         if( msg->header == NULL ) {
634                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
635                 msg->state = RMR_ERR_NOHDR;
636                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
637                 return msg;
638         }
639
640         if( max_to < 0 ) {
641                 max_to = ctx->send_retries;             // convert to retries
642         }
643
644         send_again = 1;                                                                                 // force loop entry
645         group = 0;                                                                                              // always start with group 0
646
647         key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
648         if( msg->sub_id != UNSET_SUBID ) {
649                 altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
650         }
651         while( send_again ) {
652                 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
653                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
654                                 msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
655
656                 if( ! sock_ok ) {
657                         if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
658                                 altk_ok = 0;
659                                 key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
660                                 send_again = 1;                                                                         // ensure we don't exit the while
661                                 continue;
662                         }
663
664                         msg->state = RMR_ERR_NOENDPT;
665                         errno = ENXIO;                                                                                  // must ensure it's not eagain
666                         return msg;                                                                                             // caller can resend (maybe) or free
667                 }
668
669                 group++;
670
671                 if( send_again ) {
672                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
673                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
674                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
675                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
676                         /*
677                         if( msg ) {
678                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
679                         }
680                         */
681
682                         msg = clone_m;                                                                                  // clone will be the next to send
683                 } else {
684                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
685                 }
686         }
687
688         return msg;                                                                     // last message caries the status of last/only send attempt
689 }
690
691
692 /*
693         A generic wrapper to the real send to keep wormhole stuff agnostic.
694         We assume the wormhole function vetted the buffer so we don't have to.
695 */
696 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
697         return send_msg( ctx, msg, ep->nn_sock, -1 );
698 }
699
700 #endif