5ca44033bc2d2620fc3661e546b05df9695e400f
[ric-plt/lib/rmr.git] / src / rmr / nng / src / sr_nng_static.c
1 // : vi ts=4 sw=4 noet 2
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_nng_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_nng_static_c
32 #define _sr_nng_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40 /*
41         Translates the nng state passed in to one of ours that is suitable to put
42         into the message, and sets errno to something that might be useful.
43         If we don't have a specific RMr state, then we return the default (e.g.
44         receive failed).
45
46         The addition of the connection shut error code to the switch requires
47         that the NNG version at commit e618abf8f3db2a94269a (or after) be
48         used for compiling RMR. 
49 */
50 static inline int xlate_nng_state( int state, int def_state ) {
51
52         switch( state ) {
53                 case 0:
54                         errno = 0;
55                         state = RMR_OK;
56                         break;
57
58                 case NNG_EAGAIN:                                // soft errors get retry as the RMr error
59                         state = RMR_ERR_RETRY;
60                         errno = EAGAIN;
61                         break;
62
63                 case NNG_ETIMEDOUT:
64                         state = RMR_ERR_RETRY;
65                         errno = EAGAIN;
66                         break;
67
68                 case NNG_ENOTSUP:
69                         errno  = ENOTSUP;
70                         state = def_state;
71                         break;
72
73                 case NNG_EINVAL:
74                         errno  = EINVAL;
75                         state = def_state;
76                         break;
77
78                 case NNG_ENOMEM:
79                         errno  = ENOMEM;
80                         state = def_state;
81                         break;
82
83                 case NNG_ESTATE:
84                         errno  = EBADFD;                                // file des not in a good state for the operation
85                         state = def_state;
86                         break;
87
88                 case NNG_ECONNSHUT:                                     // new error with nng commit e618abf8f3db2a94269a79c8901a51148d48fcc2 (Sept 2019)
89                 case NNG_ECLOSED:
90                         errno  = EBADFD;                                // file des not in a good state for the operation
91                         state = def_state;
92                         break;
93
94                 default:
95                         errno = EBADE;
96                         state = def_state;
97                         break;
98         }
99
100         return state;
101 }
102
103 /*
104         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
105         a new message struct as well. Size is the size of the zc buffer to allocate (not
106         including our header). If size is 0, then the buffer allocated is the size previously
107         allocated (if msg is !nil) or the default size given at initialisation).
108
109         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
110         the context value is used.
111
112         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
113                 are zero copy, however ONLY the message is zero copy. We now allocate and use
114                 nng messages.
115 */
116 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
117         size_t          mlen;                   // size of the transport buffer that we'll allocate
118         uta_mhdr_t*     hdr;                    // convenience pointer
119         int                     tr_len;                 // trace data len (default or override)
120
121         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
122
123         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
124         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
125
126         if( msg == NULL ) {
127                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
128                 if( msg == NULL ) {
129                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
130                         exit( 1 );
131                 }
132         } else {
133                 mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
134         }
135
136         memset( msg, 0, sizeof( *msg ) );
137
138         if( (state = nng_msg_alloc( (nng_msg **) &msg->tp_buf, mlen )) != 0 ) {
139                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d\n", ENOMEM );
140                 abort( );                                                                                       // toss out a core file for this
141         }
142
143         msg->header = nng_msg_body( msg->tp_buf );
144         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
145         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
146                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
147                 hdr->sub_id = htonl( UNSET_SUBID );
148                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
149                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
150                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
151                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
152         }
153         msg->len = 0;                                                                                   // length of data in the payload
154         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
155         msg->sub_id = UNSET_SUBID;
156         msg->mtype = UNSET_MSGTYPE;
157         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
158         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
159         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
160         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
161         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
162         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
163
164         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
165
166         return msg;
167 }
168
169 /*
170         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
171         NNG receive must allocate that on its own.
172 */
173 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
174         size_t  mlen;
175         uta_mhdr_t* hdr;                        // convenience pointer
176         rmr_mbuf_t* msg;
177
178         msg = (rmr_mbuf_t *) malloc( sizeof *msg );
179         if( msg == NULL ) {
180                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
181                 exit( 1 );
182         }
183
184         memset( msg, 0, sizeof( *msg ) );
185
186         msg->sub_id = UNSET_SUBID;
187         msg->mtype = UNSET_MSGTYPE;
188         msg->tp_buf = NULL;
189         msg->header = NULL;
190         msg->len = -1;                                                                                  // no payload; invalid len
191         msg->alloc_len = -1;
192         msg->payload = NULL;
193         msg->xaction = NULL;
194         msg->state = RMR_ERR_UNSET;
195         msg->flags = 0;
196
197         return msg;
198 }
199
200 /*
201         This accepts a message with the assumption that only the tp_buf pointer is valid. It
202         sets all of the various header/payload/xaction pointers in the mbuf to the proper
203         spot in the transport layer buffer.  The len in the header is assumed to be the
204         allocated len (a receive buffer that nng created);
205
206         The alen parm is the assumed allocated length; assumed because it's a value likely
207         to have come from nng receive and the actual alloc len might be larger, but we
208         can only assume this is the total usable space.
209
210         This function returns the message with an error state set if it detects that the
211         received message might have been truncated.  Check is done here as the calculation
212         is somewhat based on header version.
213 */
214 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
215         uta_mhdr_t* hdr = NULL;                 // current header
216         uta_v1mhdr_t* v1hdr;                    // version 1 header
217         int ver;
218         int     hlen;                                           // header len to use for a truncation check
219
220         msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
221         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
222
223         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
224                 ver = 1;
225                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
226         } else {
227                 ver = ntohl( v1hdr->rmr_ver );
228         }
229
230         switch( ver ) {
231                 case 1:
232                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
233                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
234                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
235
236                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
237                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
238                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
239                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
240                         msg->state = RMR_OK;
241                         hlen = sizeof( uta_v1mhdr_t );
242                         break;
243
244                 default:                                                                                                        // current version always lands here
245                         hdr = (uta_mhdr_t *) msg->header;
246                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
247                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
248
249                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
250                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
251                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
252                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
253                         msg->sub_id = ntohl( hdr->sub_id );
254                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
255                         break;
256         }
257
258         if( msg->len > (msg->alloc_len - hlen ) ) {
259                 msg->state = RMR_ERR_TRUNC;
260                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
261         } else {
262                 msg->state = RMR_OK;
263         }
264 }
265
266 /*
267         This will clone a message into a new zero copy buffer and return the cloned message.
268 */
269 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
270         rmr_mbuf_t* nm;                 // new message buffer
271         size_t  mlen;
272         int state;
273         uta_mhdr_t* hdr;
274         uta_v1mhdr_t* v1hdr;
275
276         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
277         if( nm == NULL ) {
278                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
279                 exit( 1 );
280         }
281         memset( nm, 0, sizeof( *nm ) );
282
283         mlen = old_msg->alloc_len;                                                                              // length allocated before
284         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
285                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
286                 exit( 1 );
287         }
288
289         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
290         v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
291         switch( ntohl( v1hdr->rmr_ver ) ) {
292                 case 1:
293                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
294                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
295                         break;
296
297                 default:                                                                                        // current message always caught  here
298                         hdr = nm->header;
299                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
300                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
301                         break;
302         }
303
304         // --- these are all version agnostic -----------------------------------
305         nm->mtype = old_msg->mtype;
306         nm->sub_id = old_msg->sub_id;
307         nm->len = old_msg->len;                                                                 // length of data in the payload
308         nm->alloc_len = mlen;                                                                   // length of allocated payload
309
310         nm->xaction = hdr->xid;                                                                 // reference xaction
311         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
312         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
313         memcpy( nm->payload, old_msg->payload, old_msg->len );
314
315         return nm;
316 }
317
318 /*
319         This will clone a message with a change to the trace area in the header such that
320         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
321         The orignal message will be left unchanged, and a pointer to the new message is returned.
322         It is not possible to realloc buffers and change the data sizes.
323 */
324 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
325         rmr_mbuf_t* nm;                 // new message buffer
326         size_t  mlen;
327         int state;
328         uta_mhdr_t* hdr;
329         uta_v1mhdr_t* v1hdr;
330         int     tr_old_len;                     // tr size in new buffer
331
332         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
333         if( nm == NULL ) {
334                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
335                 exit( 1 );
336         }
337         memset( nm, 0, sizeof( *nm ) );
338
339         hdr = old_msg->header;
340         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
341
342         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
343         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
344         if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
345                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
346                 exit( 1 );
347         }
348
349         nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
350         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
351         switch( ntohl( v1hdr->rmr_ver ) ) {
352                 case 1:
353                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
354                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
355                         break;
356
357                 default:                                                                                        // current message version always caught  here
358                         hdr = nm->header;
359                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
360                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
361
362                         if( RMR_D1_LEN( hdr )  ) {
363                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
364                         }
365                         if( RMR_D2_LEN( hdr )  ) {
366                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
367                         }
368
369                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
370                         break;
371         }
372
373         // --- these are all version agnostic -----------------------------------
374         nm->mtype = old_msg->mtype;
375         nm->sub_id = old_msg->sub_id;
376         nm->len = old_msg->len;                                                                 // length of data in the payload
377         nm->alloc_len = mlen;                                                                   // length of allocated payload
378
379         nm->xaction = hdr->xid;                                                                 // reference xaction
380         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
381         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
382         memcpy( nm->payload, old_msg->payload, old_msg->len );
383
384         return nm;
385 }
386
387 /*
388         This is the receive work horse used by the outer layer receive functions.
389         It waits for a message to be received on our listen socket. If old msg
390         is passed in, the we assume we can use it instead of allocating a new
391         one, else a new block of memory is allocated.
392
393         This allocates a zero copy message so that if the user wishes to call
394         rmr_rts_msg() the send is zero copy.
395
396         The nng timeout on send is at the ms level which is a tad too long for
397         our needs.  So, if NNG returns eagain or timedout (we don't set one)
398         we will loop up to 5 times with a 10 microsecond delay between each
399         attempt.  If at the end of this set of retries NNG is still saying
400         eagain/timeout we'll return to the caller with that set in errno.
401         Right now this is only for zero-copy buffers (they should all be zc
402         buffers now).
403
404
405         In the NNG msg world it must allocate the receive buffer rather
406         than accepting one that we allocated from their space and could
407         reuse.  They have their reasons I guess.  Thus, we will free
408         the old transport buffer if user passes the message in; at least
409         our mbuf will be reused.
410
411         When msg->state is not ok, this function must set tp_state in the message as some API 
412         fucntions return the message directly and do not propigate errno into the message.
413 */
414 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
415         int state;
416         rmr_mbuf_t*     msg = NULL;             // msg received
417         uta_mhdr_t* hdr;
418         size_t  rsize;                          // nng needs to write back the size received... grrr
419
420         if( old_msg ) {
421                 msg = old_msg;
422                 if( msg->tp_buf != NULL ) {
423                         nng_msg_free( msg->tp_buf );
424                 }
425
426                 msg->tp_buf = NULL;
427         } else {
428                 msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
429         }
430
431         msg->alloc_len = 0;
432         msg->len = 0;
433         msg->payload = NULL;
434         msg->xaction = NULL;
435         msg->tp_buf = NULL;
436
437         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
438         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
439                 msg->tp_state = errno;
440                 return msg;
441         }
442
443         msg->tp_state = 0;
444         if( msg->tp_buf == NULL ) {             // if state is good this _should_ not be nil, but parninoia says check anyway
445                 msg->state = RMR_ERR_EMPTY;
446                 msg->tp_state = 0;
447                 return msg;
448         }
449
450         rsize = nng_msg_len( msg->tp_buf );
451         if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
452                 ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
453                 hdr = (uta_mhdr_t *) msg->header;
454                 msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
455
456                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
457                                 msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
458         } else {
459                 msg->state = RMR_ERR_EMPTY;
460                 msg->tp_state = 0;
461                 msg->len = 0;
462                 msg->alloc_len = rsize;
463                 msg->payload = NULL;
464                 msg->xaction = NULL;
465                 msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
466                 msg->mtype = UNSET_MSGTYPE;
467                 msg->sub_id = UNSET_SUBID;
468         }
469
470         return msg;
471 }
472
473 /*
474         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
475         message buffer cannot be used to send, and the length information may or may
476         not be correct (it is set to the length received which might be more than the
477         bytes actually in the payload).
478
479         Mostly this supports the route table collector, but could be extended with an
480         API external function.
481 */
482 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
483         int state;
484         rmr_mbuf_t*     msg = NULL;             // msg received
485         size_t  rsize;                          // nng needs to write back the size received... grrr
486
487         if( old_msg ) {
488                 msg = old_msg;
489         } else {
490                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
491         }
492
493         msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
494         if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
495                 return msg;
496         }
497         rsize = nng_msg_len( msg->tp_buf );
498
499         // do NOT use ref_tpbuf() here! Must fill these in manually.
500         msg->header = nng_msg_body( msg->tp_buf );
501         msg->len = rsize;                                                       // len is the number of bytes received
502         msg->alloc_len = rsize;
503         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
504         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
505         msg->state = RMR_OK;
506         msg->flags = MFL_RAW;
507         msg->payload = msg->header;                                     // payload is the whole thing; no header
508         msg->xaction = NULL;
509
510         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
511
512         return msg;
513 }
514
515 /*
516         This does the hard work of actually sending the message to the given socket. On success,
517         a new message struct is returned. On error, the original msg is returned with the state
518         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
519         buffer will not be allocated and returned (mostly for call() interal processing since
520         the return message from call() is a received buffer, not a new one).
521
522         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
523         validation has been done prior.
524
525         When msg->state is not ok, this function must set tp_state in the message as some API 
526         fucntions return the message directly and do not propigate errno into the message.
527 */
528 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries ) {
529         int state;
530         uta_mhdr_t*     hdr;
531         int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
532         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
533         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
534
535         // future: ensure that application did not overrun the XID buffer; last byte must be 0
536
537         hdr = (uta_mhdr_t *) msg->header;
538         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
539         hdr->sub_id = htonl( msg->sub_id );
540         hdr->plen = htonl( msg->len );
541         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
542
543         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
544                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
545                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
546         }
547
548         errno = 0;
549         msg->state = RMR_OK;
550         if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
551                 do {
552                         if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
553                                 msg->state = state;
554                                 if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
555                                         if( --spin_retries <= 0 ) {                     // don't give up the processor if we don't have to
556                                                 retries--;
557                                                 if( retries > 0 ) {                                     // only if we'll loop through again
558                                                         usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
559                                                 }
560                                                 spin_retries = 1000;
561                                         }
562                                 } else {
563                                         state = 0;                      // don't loop
564                                         //if( DEBUG ) fprintf( stderr, ">>>>> send failed: %s\n", nng_strerror( state ) );
565                                 }
566                         } else {
567                                 state = 0;
568                                 msg->state = RMR_OK;
569                                 msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
570                                 msg->tp_buf = NULL;
571                                 hdr = NULL;
572                         }
573                 } while( state && retries > 0 );
574         } else {
575                 // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
576                 msg->state = RMR_ERR_SENDFAILED;
577                 errno = ENOTSUP;
578                 msg->tp_state = errno;
579                 return msg;
580                 /*
581                 NOT SUPPORTED
582                 if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
583                         msg->state = state;
584                         //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
585                 }
586                 */
587         }
588
589         if( msg->state == RMR_OK ) {                                                                    // successful send
590                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
591                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
592                 } else {
593                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
594                         return NULL;
595                 }
596         } else {                                                                                        // send failed -- return original message
597                 if( msg->state == NNG_EAGAIN || msg->state == NNG_ETIMEDOUT ) {
598                         errno = EAGAIN;
599                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
600                 } else {
601                         msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
602                 }
603
604                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
605         }
606
607         return msg;
608 }
609
610 /*
611         send message with maximum timeout.
612         Accept a message and send it to an endpoint based on message type.
613         If NNG reports that the send attempt timed out, or should be retried,
614         RMr will retry for approximately max_to microseconds; rounded to the next
615         higher value of 10.
616
617         Allocates a new message buffer for the next send. If a message type has
618         more than one group of endpoints defined, then the message will be sent
619         in round robin fashion to one endpoint in each group.
620
621         An endpoint will be looked up in the route table using the message type and
622         the subscription id. If the subscription id is "UNSET_SUBID", then only the
623         message type is used.  If the initial lookup, with a subid, fails, then a
624         second lookup using just the mtype is tried.
625
626         When msg->state is not OK, this function must set tp_state in the message as 
627         some API fucntions return the message directly and do not propigate errno into 
628         the message.
629
630         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
631                 it will return with an error and errno set to eagain. If the send is
632                 a limited fanout, then the returned status is the status of the last
633                 send attempt.
634
635 */
636 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
637         rtable_ent_t*   rte;                    // the route table entry which matches the message key
638         nng_socket      nn_sock;                        // endpoint socket for send
639         uta_ctx_t*      ctx;
640         int                     group;                          // selected group to get socket for
641         int                     send_again;                     // true if the message must be sent again
642         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
643         int                     sock_ok;                        // got a valid socket from round robin select
644         char*           d1;
645         int                     ok_sends = 0;           // track number of ok sends
646
647         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
648                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
649                 if( msg != NULL ) {
650                         msg->state = RMR_ERR_BADARG;
651                         errno = EINVAL;                                                                                 // must ensure it's not eagain
652                         msg->tp_state = errno;
653                 }
654                 return msg;
655         }
656
657         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
658         if( msg->header == NULL ) {
659                 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
660                 msg->state = RMR_ERR_NOHDR;
661                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
662                 msg->tp_state = errno;
663                 return msg;
664         }
665
666         if( max_to < 0 ) {
667                 max_to = ctx->send_retries;             // convert to retries
668         }
669
670         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
671                 if( ctx->flags & CTXFL_WARN ) {
672                         fprintf( stderr, "[WARN] no endpoint for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
673                 }
674                 msg->state = RMR_ERR_NOENDPT;
675                 errno = ENXIO;                                                                          // must ensure it's not eagain
676                 msg->tp_state = errno;
677                 return msg;                                                                                     // caller can resend (maybe) or free
678         }
679
680         send_again = 1;                                                                                 // force loop entry
681         group = 0;                                                                                              // always start with group 0
682         while( send_again ) {
683                 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock );                                                           // select endpt from rr group and set again if more groups
684
685                 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
686                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
687
688                 group++;
689
690                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
691                         if( send_again ) {
692                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
693                                 if( clone_m == NULL ) {
694                                         msg->state = RMR_ERR_SENDFAILED;
695                                         errno = ENOMEM;
696                                         msg->tp_state = errno;
697                                         if( ctx->flags & CTXFL_WARN ) {
698                                                 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
699                                         }
700                                         return msg;
701                                 }
702
703                                 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
704                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
705                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
706         
707                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
708                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
709                                         msg = clone_m;
710                                 } else {
711                                         ok_sends++;
712                                         msg = clone_m;                                                                          // clone will be the next to send
713                                 }
714                         } else {
715                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
716                                 if( DEBUG ) {
717                                         if( msg == NULL ) {
718                                                 fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
719                                         }
720                                 }
721                         }
722                 } else {
723                         if( ctx->flags & CTXFL_WARN ) {
724                                 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
725                         }
726                         msg->state = RMR_ERR_NOENDPT;
727                         errno = ENXIO;
728                 }
729         }
730
731         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
732                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
733                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
734                         msg->state = RMR_OK;
735                 }
736         
737                 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
738         
739                 msg->tp_state = errno;
740         }
741
742         return msg;                                                                     // last message caries the status of last/only send attempt
743 }
744
745
746 /*
747         A generic wrapper to the real send to keep wormhole stuff agnostic.
748         We assume the wormhole function vetted the buffer so we don't have to.
749 */
750 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
751         return send_msg( ctx, msg, ep->nn_sock, -1 );
752 }
753
754 #endif