4a7182657fa808a37f2e7dcac6442456d94c2c5c
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45 */
46 static inline int xlate_nng_state( int state, int def_state ) {
47
48         switch( state ) {
49                 case 0:
50                         errno = 0;
51                         state = RMR_OK;
52                         break;
53
54                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
55                         state = RMR_ERR_RETRY;
56                         errno = EAGAIN;
57                         break;
58
59                 case NNG_ETIMEDOUT:
60                         state = RMR_ERR_RETRY;
61                         errno = EAGAIN;
62                         break;
63
64                 case NNG_ENOTSUP:
65                         errno  = ENOTSUP;
66                         state = def_state;
67                         break;
68
69                 case NNG_EINVAL:
70                         errno  = EINVAL;
71                         state = def_state;
72                         break;
73
74                 case NNG_ENOMEM:
75                         errno  = ENOMEM;
76                         state = def_state;
77                         break;
78
79                 case NNG_ESTATE:
80                         errno  = EBADFD;                                // file des not in a good state for the operation
81                         state = def_state;
82                         break;
83
84                 case NNG_ECLOSED:
85                         errno  = EBADFD;                                // file des not in a good state for the operation
86                         state = def_state;
87                         break;
88
89                 default:
90                         errno = EBADE;
91                         state = def_state;
92                         break;
93         }
94
95         return state;
96 }
97
98 /*
99         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
100         a new message struct as well. Size is the size of the zc buffer to allocate (not
101         including our header). If size is 0, then the buffer allocated is the size previously
102         allocated (if msg is !nil) or the default size given at initialisation).
103
104         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
105         the context value is used.
106
107         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
108                 are zero copy, however ONLY the message is zero copy. We now allocate and use
109                 nng messages.
110 */
111 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
112         size_t          mlen;                   // size of the transport buffer that we'll allocate
113         uta_mhdr_t*     hdr;                    // convenience pointer
114         int                     tr_len;                 // trace data len (default or override)
115
116         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
117
118         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
119         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
120
121         if( msg == NULL ) {
122                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
123                 if( msg == NULL ) {
124                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
125                         exit( 1 );
126                 }
127         } else {
128                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
129         }
130
131         memset( msg, 0, sizeof( *msg ) );
132
133         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
134                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
135                 abort( );                                                                                       // toss out a core file for this
136         }
137
138         msg->header = nng_msg_body( msg->tp_buf );
139         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
140         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
141                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
142                 hdr->sub_id = htonl( UNSET_SUBID );
143                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
144                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
145                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
146                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
147         }
148         msg->len = 0;                                                                                   // length of data in the payload
149         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
150         msg->sub_id = UNSET_SUBID;
151         msg->mtype = UNSET_MSGTYPE;
152         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
153         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
154         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
155         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
156         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
157         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
158
159         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
160
161         return msg;
162 }
163
164 /*
165         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
166         NNG receive must allocate that on its own.
167 */
168 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
169         size_t  mlen;
170         uta_mhdr_t* hdr;                        // convenience pointer
171         rmr_mbuf_t* msg;
172
173         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
174         if( msg == NULL ) {
175                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
176                 exit( 1 );
177         }
178
179         memset( msg, 0, sizeof( *msg ) );
180
181         msg->sub_id = UNSET_SUBID;
182         msg->mtype = UNSET_MSGTYPE;
183         msg->tp_buf = NULL;
184         msg->header = NULL;
185         msg->len = -1;                                                                                  // no payload; invalid len
186         msg->alloc_len = -1;
187         msg->payload = NULL;
188         msg->xaction = NULL;
189         msg->state = RMR_ERR_UNSET;
190         msg->flags = 0;
191
192         return msg;
193 }
194
195 /*
196         This accepts a message with the assumption that only the tp_buf pointer is valid. It
197         sets all of the various header/payload/xaction pointers in the mbuf to the proper
198         spot in the transport layer buffer.  The len in the header is assumed to be the
199         allocated len (a receive buffer that nng created);
200
201         The alen parm is the assumed allocated length; assumed because it's a value likely
202         to have come from nng receive and the actual alloc len might be larger, but we
203         can only assume this is the total usable space.
204
205         This function returns the message with an error state set if it detects that the
206         received message might have been truncated.  Check is done here as the calculation
207         is somewhat based on header version.
208 */
209 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
210         uta_mhdr_t* hdr = NULL;                 // current header
211         uta_v1mhdr_t* v1hdr;                    // version 1 header
212         int ver;
213         int     hlen;                                           // header len to use for a truncation check
214
215         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
216         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
217
218         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
219                 ver = 1;
220                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
221         } else {
222                 ver = ntohl( v1hdr->rmr_ver );
223         }
224
225         switch( ver ) {
226                 case 1:
227                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
228                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
229                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
230
231                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
232                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
233                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
234                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
235                         msg->state = RMR_OK;
236                         hlen = sizeof( uta_v1mhdr_t );
237                         break;
238
239                 default:                                                                                                        // current version always lands here
240                         hdr = (uta_mhdr_t *) msg->header;
241                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
242                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
243
244                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
245                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
246                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
247                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
248                         msg->sub_id = ntohl( hdr->sub_id );
249                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
250                         break;
251         }
252
253         if( msg->len > (msg->alloc_len - hlen ) ) {
254                 msg->state = RMR_ERR_TRUNC;
255                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
256         } else {
257                 msg->state = RMR_OK;
258         }
259 }
260
261 /*
262         This will clone a message into a new zero copy buffer and return the cloned message.
263 */
264 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
265         rmr_mbuf_t* nm;                 // new message buffer
266         size_t  mlen;
267         int state;
268         uta_mhdr_t* hdr;
269         uta_v1mhdr_t* v1hdr;
270
271         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
272         if( nm == NULL ) {
273                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
274                 exit( 1 );
275         }
276         memset( nm, 0, sizeof( *nm ) );
277
278         mlen = old_msg->alloc_len;                                                                              // length allocated before
279         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
280                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
281                 exit( 1 );
282         }
283
284         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
285         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
286         switch( ntohl( v1hdr->rmr_ver ) ) {
287                 case 1:
288                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
289                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
290                         break;
291
292                 default:                                                                                        // current message always caught  here
293                         hdr = nm->header;
294                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
295                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
296                         break;
297         }
298
299         // --- these are all version agnostic -----------------------------------
300         nm->mtype = old_msg->mtype;
301         nm->sub_id = old_msg->sub_id;
302         nm->len = old_msg->len;                                                                 // length of data in the payload
303         nm->alloc_len = mlen;                                                                   // length of allocated payload
304
305         nm->xaction = hdr->xid;                                                                 // reference xaction
306         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
307         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
308         memcpy( nm->payload, old_msg->payload, old_msg->len );
309
310         return nm;
311 }
312
313 /*
314         This will clone a message with a change to the trace area in the header such that
315         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
316         The orignal message will be left unchanged, and a pointer to the new message is returned.
317         It is not possible to realloc buffers and change the data sizes.
318 */
319 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
320         rmr_mbuf_t* nm;                 // new message buffer
321         size_t  mlen;
322         int state;
323         uta_mhdr_t* hdr;
324         uta_v1mhdr_t* v1hdr;
325         int     tr_old_len;                     // tr size in new buffer
326
327         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
328         if( nm == NULL ) {
329                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
330                 exit( 1 );
331         }
332         memset( nm, 0, sizeof( *nm ) );
333
334         hdr = old_msg->header;
335         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
336
337         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
338         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
339         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
340                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
341                 exit( 1 );
342         }
343
344         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
345         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
346         switch( ntohl( v1hdr->rmr_ver ) ) {
347                 case 1:
348                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
349                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
350                         break;
351
352                 default:                                                                                        // current message version always caught  here
353                         hdr = nm->header;
354                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
355                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
356
357                         if( RMR_D1_LEN( hdr )  ) {
358                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
359                         }
360                         if( RMR_D2_LEN( hdr )  ) {
361                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
362                         }
363
364                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
365                         break;
366         }
367
368         // --- these are all version agnostic -----------------------------------
369         nm->mtype = old_msg->mtype;
370         nm->sub_id = old_msg->sub_id;
371         nm->len = old_msg->len;                                                                 // length of data in the payload
372         nm->alloc_len = mlen;                                                                   // length of allocated payload
373
374         nm->xaction = hdr->xid;                                                                 // reference xaction
375         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
376         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
377         memcpy( nm->payload, old_msg->payload, old_msg->len );
378
379         return nm;
380 }
381
382 /*
383         This is the receive work horse used by the outer layer receive functions.
384         It waits for a message to be received on our listen socket. If old msg
385         is passed in, the we assume we can use it instead of allocating a new
386         one, else a new block of memory is allocated.
387
388         This allocates a zero copy message so that if the user wishes to call
389         rmr_rts_msg() the send is zero copy.
390
391         The nng timeout on send is at the ms level which is a tad too long for
392         our needs.  So, if NNG returns eagain or timedout (we don't set one)
393         we will loop up to 5 times with a 10 microsecond delay between each
394         attempt.  If at the end of this set of retries NNG is still saying
395         eagain/timeout we'll return to the caller with that set in errno.
396         Right now this is only for zero-copy buffers (they should all be zc
397         buffers now).
398
399
400         In the NNG msg world it must allocate the receive buffer rather
401         than accepting one that we allocated from their space and could
402         reuse.  They have their reasons I guess.  Thus, we will free
403         the old transport buffer if user passes the message in; at least
404         our mbuf will be reused.
405
406         When msg->state is not ok, this function must set tp_state in the message as some API 
407         fucntions return the message directly and do not propigate errno into the message.
408 */
409 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
410         int state;
411         rmr_mbuf_t*     msg = NULL;             // msg received
412         uta_mhdr_t* hdr;
413         size_t  rsize;                          // nng needs to write back the size received... grrr
414
415         if( old_msg ) {
416                 msg = old_msg;
417                 if( msg->tp_buf != NULL ) {
418                         nng_msg_free( msg->tp_buf );
419                 }
420
421                 msg->tp_buf = NULL;
422         } else {
423                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
424         }
425
426         msg->alloc_len = 0;
427         msg->len = 0;
428         msg->payload = NULL;
429         msg->xaction = NULL;
430         msg->tp_buf = NULL;
431
432         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
433         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
434                 msg->tp_state = errno;
435                 return msg;
436         }
437
438         msg->tp_state = 0;
439         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
440                 msg->state = RMR_ERR_EMPTY;
441                 msg->tp_state = 0;
442                 return msg;
443         }
444
445         rsize = nng_msg_len( msg->tp_buf );
446         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
447                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
448                 hdr = (uta_mhdr_t *) msg->header;
449                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
450
451                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
452                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
453         } else {
454                 msg->state = RMR_ERR_EMPTY;
455                 msg->tp_state = 0;
456                 msg->len = 0;
457                 msg->alloc_len = rsize;
458                 msg->payload = NULL;
459                 msg->xaction = NULL;
460                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
461                 msg->mtype = UNSET_MSGTYPE;
462                 msg->sub_id = UNSET_SUBID;
463         }
464
465         return msg;
466 }
467
468 /*
469         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
470         message buffer cannot be used to send, and the length information may or may
471         not be correct (it is set to the length received which might be more than the
472         bytes actually in the payload).
473
474         Mostly this supports the route table collector, but could be extended with an
475         API external function.
476 */
477 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
478         int state;
479         rmr_mbuf_t*     msg = NULL;             // msg received
480         size_t  rsize;                          // nng needs to write back the size received... grrr
481
482         if( old_msg ) {
483                 msg = old_msg;
484         } else {
485                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
486         }
487
488         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
489         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
490                 return msg;
491         }
492         rsize = nng_msg_len( msg->tp_buf );
493
494         // do NOT use ref_tpbuf() here! Must fill these in manually.
495         msg->header = nng_msg_body( msg->tp_buf );
496         msg->len = rsize;                                                       // len is the number of bytes received
497         msg->alloc_len = rsize;
498         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
499         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
500         msg->state = RMR_OK;
501         msg->flags = MFL_RAW;
502         msg->payload = msg->header;                                     // payload is the whole thing; no header
503         msg->xaction = NULL;
504
505         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
506
507         return msg;
508 }
509
510 /*
511         This does the hard work of actually sending the message to the given socket. On success,
512         a new message struct is returned. On error, the original msg is returned with the state
513         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
514         buffer will not be allocated and returned (mostly for call() interal processing since
515         the return message from call() is a received buffer, not a new one).
516
517         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
518         validation has been done prior.
519
520         When msg->state is not ok, this function must set tp_state in the message as some API 
521         fucntions return the message directly and do not propigate errno into the message.
522 */
523 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
524         int state;
525         uta_mhdr_t*     hdr;
526         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
527         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
528         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
529
530         // future: ensure that application did not overrun the XID buffer; last byte must be 0
531
532         hdr = (uta_mhdr_t *) msg->header;
533         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
534         hdr->sub_id = htonl( msg->sub_id );
535         hdr->plen = htonl( msg->len );
536         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
537
538         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
539                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
540                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
541         }
542
543         errno = 0;
544         msg->state = RMR_OK;
545         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
546                 do {
547                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
548                                 msg->state = state;
549                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
550                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
551                                                 retries--;
552                                                 if( retries > 0 ) {                                     // only if we'll loop through again
553                                                         usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
554                                                 }
555                                                 spin_retries = 1000;
556                                         }
557                                 } else {
558                                         state = 0;                      // don't loop
559                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
560                                 }
561                         } else {
562                                 state = 0;
563                                 msg->state = RMR_OK;
564                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
565                                 msg->tp_buf = NULL;
566                                 hdr = NULL;
567                         }
568                 } while( state && retries > 0 );
569         } else {
570                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
571                 msg->state = RMR_ERR_SENDFAILED;
572                 errno = ENOTSUP;
573                 msg->tp_state = errno;
574                 return msg;
575                 /*
576                 NOT SUPPORTED
577                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
578                         msg->state = state;
579                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
580                 }
581                 */
582         }
583
584         if( msg->state == RMR_OK ) {                                                                    // successful send
585                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
586                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
587                 } else {
588                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
589                         return NULL;
590                 }
591         } else {                                                                                        // send failed -- return original message
592                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
593                         errno = EAGAIN;
594                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
595                 } else {
596                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
597                 }
598
599                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
600         }
601
602         return msg;
603 }
604
605 /*
606         send message with maximum timeout.
607         Accept a message and send it to an endpoint based on message type.
608         If NNG reports that the send attempt timed out, or should be retried,
609         RMr will retry for approximately max_to microseconds; rounded to the next
610         higher value of 10.
611
612         Allocates a new message buffer for the next send. If a message type has
613         more than one group of endpoints defined, then the message will be sent
614         in round robin fashion to one endpoint in each group.
615
616         An endpoint will be looked up in the route table using the message type and
617         the subscription id. If the subscription id is "UNSET_SUBID", then only the
618         message type is used.  If the initial lookup, with a subid, fails, then a
619         second lookup using just the mtype is tried.
620
621         When msg->state is not OK, this function must set tp_state in the message as 
622         some API fucntions return the message directly and do not propigate errno into 
623         the message.
624
625         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
626                 it will return with an error and errno set to eagain. If the send is
627                 a limited fanout, then the returned status is the status of the last
628                 send attempt.
629
630 */
631 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
632         nng_socket      nn_sock;                        // endpoint socket for send
633         uta_ctx_t*      ctx;
634         int                     group;                          // selected group to get socket for
635         int                     send_again;                     // true if the message must be sent again
636         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
637         int                     sock_ok;                        // got a valid socket from round robin select
638         uint64_t         key;                           // mtype or sub-id/mtype sym table key
639         int                     altk_ok = 0;            // set true if we can lookup on alternate key if mt/sid lookup fails
640         char*           d1;
641
642         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
643                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
644                 if( msg != NULL ) {
645                         msg->state = RMR_ERR_BADARG;
646                         errno = EINVAL;                                                                                 // must ensure it's not eagain
647                         msg->tp_state = errno;
648                 }
649                 return msg;
650         }
651
652         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
653         if( msg->header == NULL ) {
654                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
655                 msg->state = RMR_ERR_NOHDR;
656                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
657                 msg->tp_state = errno;
658                 return msg;
659         }
660
661         if( max_to < 0 ) {
662                 max_to = ctx->send_retries;             // convert to retries
663         }
664
665         send_again = 1;                                                                                 // force loop entry
666         group = 0;                                                                                              // always start with group 0
667
668         key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
669         if( msg->sub_id != UNSET_SUBID ) {
670                 altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
671         }
672         while( send_again ) {
673                 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
674                 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
675                                 msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
676
677                 if( ! sock_ok ) {
678                         if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
679                                 altk_ok = 0;
680                                 key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
681                                 send_again = 1;                                                                         // ensure we don't exit the while
682                                 continue;
683                         }
684
685                         msg->state = RMR_ERR_NOENDPT;
686                         errno = ENXIO;                                                                                  // must ensure it's not eagain
687                         msg->tp_state = errno;
688                         return msg;                                                                                             // caller can resend (maybe) or free
689                 }
690
691                 group++;
692
693                 if( send_again ) {
694                         clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
695                         if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
696                         msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
697                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
698                         /*
699                         if( msg ) {
700                                 // error do we need to count successes/errors, how to report some success, esp if last fails?
701                         }
702                         */
703
704                         msg = clone_m;                                                                                  // clone will be the next to send
705                 } else {
706                         msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
707                 }
708         }
709
710         return msg;                                                                     // last message caries the status of last/only send attempt
711 }
712
713
714 /*
715         A generic wrapper to the real send to keep wormhole stuff agnostic.
716         We assume the wormhole function vetted the buffer so we don't have to.
717 */
718 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
719         return send_msg( ctx, msg, ep->nn_sock, -1 );
720 }
721
722 #endif