5c751d799cb93b82eb343e01db8e21f1816299f5
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40
41 static void dump_40( char *p, char* label ) {
42         int i;
43
44         if( label )
45                 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
46
47         for( i = 0; i < 40; i++ ) {
48                 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
49         }
50         fprintf( stderr, "\n" );
51 }
52
53 /*
54         Translates the nng state passed in to one of ours that is suitable to put
55         into the message, and sets errno to something that might be useful.
56         If we don't have a specific RMr state, then we return the default (e.g.
57         receive failed).
58
59         The addition of the connection shut error code to the switch requires
60         that the NNG version at commit e618abf8f3db2a94269a (or after) be
61         used for compiling RMR. 
62 */
63 static inline int xlate_si_state( int state, int def_state ) {
64
65         switch( state ) {
66                 case SI_ERR_NONE:
67                         errno = 0;
68                         state = RMR_OK;
69                         break;
70
71                 default:
72                         state = def_state;
73                         errno = EAGAIN;
74                         break;
75
76                 case SI_ERR_QUEUED:
77                 case SI_ERR_TPORT:
78                 case SI_ERR_UPORT:
79                 case SI_ERR_FORK:
80                 case SI_ERR_HANDLE:
81                 case SI_ERR_SESSID:
82                 case SI_ERR_TP:
83                 case SI_ERR_SHUTD:
84                 case SI_ERR_NOFDS:
85                 case SI_ERR_SIGUSR1:
86                 case SI_ERR_SIGUSR2:
87                 case SI_ERR_DISC:
88                 case SI_ERR_TIMEOUT:
89                 case SI_ERR_ORDREL:
90                 case SI_ERR_SIGALRM:
91                 case SI_ERR_NOMEM:
92                 case SI_ERR_ADDR:
93                         errno  = ENOTSUP;
94                         state = def_state;
95                         break;
96         }
97
98         return state;
99 }
100
101 /*
102         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
103         a new message struct as well. Size is the size of the zc buffer to allocate (not
104         including our header). If size is 0, then the buffer allocated is the size previously
105         allocated (if msg is !nil) or the default size given at initialisation).
106
107         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
108         the context value is used.
109
110         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
111                 are zero copy, however ONLY the message is zero copy. We now allocate and use
112                 nng messages.
113 */
114 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
115         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
116         uta_mhdr_t*     hdr;                    // convenience pointer
117         int                     tr_len;                 // trace data len (default or override)
118         int*            alen;                   // convenience pointer to set allocated len
119         //int                   tpb_len;                // transport buffer total len
120 static int logged = 0;
121
122         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
123
124         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
125         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
126         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
127
128         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
129                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
130                 if( msg == NULL ) {
131                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
132                         return NULL;                                                            // we used to exit -- that seems wrong
133                 }
134                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
135         } else {                                                                // user message or message from the ring
136                 if( mlen > msg->alloc_len ) {           // current allocation is too small
137                         msg->alloc_len = 0;                             // force tp_buffer realloc below
138                         if( msg->tp_buf ) {
139                                 free( msg->tp_buf );
140                         }
141                 } else {
142                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
143                 }
144         }
145
146
147         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
148                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
149                 abort( );                                                                                       // toss out a core file for this
150         }
151
152 /*
153         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
154         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
155 */
156         alen = (int *) msg->tp_buf;
157         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
158
159         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
160         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
161         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
162                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
163                 hdr->sub_id = htonl( UNSET_SUBID );
164                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
165                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
166                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
167                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
168         }
169         msg->len = 0;                                                                                   // length of data in the payload
170         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
171         msg->sub_id = UNSET_SUBID;
172         msg->mtype = UNSET_MSGTYPE;
173         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
174         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
175         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
176         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
177         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
178         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
179         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
180
181         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
182
183         return msg;
184 }
185
186 /*
187         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
188         transport receive should allocate that on its own.
189 */
190 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
191         size_t  mlen;
192         uta_mhdr_t* hdr;                        // convenience pointer
193         rmr_mbuf_t* msg;
194
195         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
196                 if( msg->tp_buf ) {
197                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
198                 }
199         } else {
200                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
201                         fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
202                         return NULL;                                                    // this used to exit, but that seems wrong
203                 }
204         }
205
206         memset( msg, 0, sizeof( *msg ) );
207
208         msg->sub_id = UNSET_SUBID;
209         msg->mtype = UNSET_MSGTYPE;
210         msg->tp_buf = NULL;
211         msg->header = NULL;
212         msg->len = -1;                                                                                  // no payload; invalid len
213         msg->alloc_len = 0;
214         msg->payload = NULL;
215         msg->xaction = NULL;
216         msg->state = state;
217         msg->flags = 0;
218         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
219
220         return msg;
221 }
222
223 /*
224         This accepts a message with the assumption that only the tp_buf pointer is valid. It
225         sets all of the various header/payload/xaction pointers in the mbuf to the proper
226         spot in the transport layer buffer.  The len in the header is assumed to be the
227         allocated len (a receive buffer that nng created);
228
229         The alen parm is the assumed allocated length; assumed because it's a value likely
230         to have come from si receive and the actual alloc len might be larger, but we
231         can only assume this is the total usable space. Because we are managing a transport
232         header in the first n bytes of the real msg, we must adjust this length down by the
233         size of the tp header (for testing 50 bytes, but this should change to a struct if
234         we adopt this interface).
235
236         This function returns the message with an error state set if it detects that the
237         received message might have been truncated.  Check is done here as the calculation
238         is somewhat based on header version.
239 */
240 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
241         uta_mhdr_t* hdr = NULL;                 // current header
242         uta_v1mhdr_t* v1hdr;                    // version 1 header
243         int ver;
244         int     hlen;                                           // header len to use for a truncation check
245
246         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;                              // FIX ME:  hard 50 needs to be some kind of tp header struct
247
248         // do NOT reduce alen any more.  alen must be TP_HEADER + RMR_HEADER + user space
249         // get payload size will do the right thing and subtract TP_HEADER and RMR_HEADER lengths
250         //alen -= 50;                                           // actual length of "rmr space" 
251
252         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
253
254         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
255                 ver = 1;
256                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
257         } else {
258                 ver = ntohl( v1hdr->rmr_ver );
259         }
260
261         switch( ver ) {
262                 case 1:
263                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
264                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
265                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
266
267                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
268                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
269                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
270                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
271                         msg->state = RMR_OK;
272                         hlen = sizeof( uta_v1mhdr_t );
273                         break;
274
275                 default:                                                                                                        // current version always lands here
276                         hdr = (uta_mhdr_t *) msg->header;
277                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
278                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
279
280                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
281                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
282                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
283                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
284                         msg->sub_id = ntohl( hdr->sub_id );
285                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
286                         break;
287         }
288
289         if( msg->len > (msg->alloc_len - hlen ) ) {
290                 msg->state = RMR_ERR_TRUNC;
291                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
292         } else {
293                 msg->state = RMR_OK;
294         }
295 }
296
297 /*
298         This will clone a message into a new zero copy buffer and return the cloned message.
299 */
300 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
301         rmr_mbuf_t* nm;                 // new message buffer
302         size_t  mlen;
303         int state;
304         uta_mhdr_t* hdr;
305         uta_v1mhdr_t* v1hdr;
306
307         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
308         if( nm == NULL ) {
309                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
310                 exit( 1 );
311         }
312         memset( nm, 0, sizeof( *nm ) );
313
314         mlen = old_msg->alloc_len;                                                                              // length allocated before
315         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
316                 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
317                 abort();
318         }
319
320         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
321         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
322         switch( ntohl( v1hdr->rmr_ver ) ) {
323                 case 1:
324                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
325                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
326                         break;
327
328                 default:                                                                                        // current message always caught  here
329                         hdr = nm->header;
330                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
331                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
332                         break;
333         }
334
335         // --- these are all version agnostic -----------------------------------
336         nm->mtype = old_msg->mtype;
337         nm->sub_id = old_msg->sub_id;
338         nm->len = old_msg->len;                                                                 // length of data in the payload
339         nm->alloc_len = mlen;                                                                   // length of allocated payload
340
341         nm->xaction = hdr->xid;                                                                 // reference xaction
342         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
343         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
344         memcpy( nm->payload, old_msg->payload, old_msg->len );
345
346         return nm;
347 }
348
349 /*
350         This will clone a message with a change to the trace area in the header such that
351         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
352         The orignal message will be left unchanged, and a pointer to the new message is returned.
353         It is not possible to realloc buffers and change the data sizes.
354 */
355 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
356         rmr_mbuf_t* nm;                 // new message buffer
357         size_t  mlen;
358         int state;
359         uta_mhdr_t* hdr;
360         uta_v1mhdr_t* v1hdr;
361         int     tr_old_len;                     // tr size in new buffer
362         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
363         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
364
365         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
366         if( nm == NULL ) {
367                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
368                 exit( 1 );
369         }
370         memset( nm, 0, sizeof( *nm ) );
371
372         hdr = old_msg->header;
373         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
374
375         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
376         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
377
378         tpb_len = mlen + TP_HDR_LEN;
379         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
380                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
381                 exit( 1 );
382         }
383         memset( nm->tp_buf, 0, tpb_len );
384         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
385         alen = (int *) nm->tp_buf;
386         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
387
388         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
389
390         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
391         switch( ntohl( v1hdr->rmr_ver ) ) {
392                 case 1:
393                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
394                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
395                         break;
396
397                 default:                                                                                        // current message version always caught  here
398                         hdr = nm->header;
399                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
400                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
401
402                         if( RMR_D1_LEN( hdr )  ) {
403                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
404                         }
405                         if( RMR_D2_LEN( hdr )  ) {
406                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
407                         }
408
409                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
410                         break;
411         }
412
413         // --- these are all version agnostic -----------------------------------
414         nm->mtype = old_msg->mtype;
415         nm->sub_id = old_msg->sub_id;
416         nm->len = old_msg->len;                                                                 // length of data in the payload
417         nm->alloc_len = mlen;                                                                   // length of allocated payload
418
419         nm->xaction = hdr->xid;                                                                 // reference xaction
420         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
421         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
422         memcpy( nm->payload, old_msg->payload, old_msg->len );
423
424         return nm;
425 }
426
427 /*
428         For SI95 based transport all receives are driven through the threaded
429         ring and thus this function should NOT be called. If it is we will panic
430         and abort straight away.
431 */
432 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
433
434 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
435 exit( 1 );
436
437         return NULL;
438 }
439
440 /*
441         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
442         message buffer cannot be used to send, and the length information may or may
443         not be correct (it is set to the length received which might be more than the
444         bytes actually in the payload).
445
446         Mostly this supports the route table collector, but could be extended with an
447         API external function.
448 */
449 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
450         return NULL;
451 /*
452 FIXME: not implemented yet
453         int state;
454         rmr_mbuf_t*     msg = NULL;             // msg received
455         size_t  rsize;                          // nng needs to write back the size received... grrr
456
457         if( old_msg ) {
458                 msg = old_msg;
459         } else {
460                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
461         }
462
463         //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
464         if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
465                 return msg;
466         }
467         rsize = nng_msg_len( msg->tp_buf );
468
469         // do NOT use ref_tpbuf() here! Must fill these in manually.
470         msg->header = nng_msg_body( msg->tp_buf );
471         msg->len = rsize;                                                       // len is the number of bytes received
472         msg->alloc_len = rsize;
473         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
474         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
475         msg->state = RMR_OK;
476         msg->flags = MFL_RAW;
477         msg->payload = msg->header;                                     // payload is the whole thing; no header
478         msg->xaction = NULL;
479
480         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
481
482         return msg;
483 */
484 }
485
486 /*
487         This does the hard work of actually sending the message to the given socket. On success,
488         a new message struct is returned. On error, the original msg is returned with the state
489         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
490         buffer will not be allocated and returned (mostly for call() interal processing since
491         the return message from call() is a received buffer, not a new one).
492
493         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
494         validation has been done prior.
495
496         When msg->state is not ok, this function must set tp_state in the message as some API 
497         fucntions return the message directly and do not propigate errno into the message.
498 */
499 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
500         int state;
501         uta_mhdr_t*     hdr;
502         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
503         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
504         int tot_len;                                                    // total send length (hdr + user data + tp header)
505
506         // future: ensure that application did not overrun the XID buffer; last byte must be 0
507
508         hdr = (uta_mhdr_t *) msg->header;
509         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
510         hdr->sub_id = htonl( msg->sub_id );
511         hdr->plen = htonl( msg->len );
512         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
513
514         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
515                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
516                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
517         }
518
519         if( retries == 0 ) {
520                 spin_retries = 100;
521                 retries++;
522         }
523
524         errno = 0;
525         msg->state = RMR_OK;
526         do {
527                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
528                 *((int*) msg->tp_buf) = tot_len;
529
530                 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
531                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
532
533                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
534                         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
535                         msg->state = state;
536                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
537                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
538                                         retries--;
539                                         if( retries > 0 ) {                                     // only if we'll loop through again
540                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
541                                         }
542                                         spin_retries = 1000;
543                                 }
544                         } else {
545                                 state = 0;                      // don't loop
546                         }
547                 } else {
548                         if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
549                         state = 0;
550                         msg->state = RMR_OK;
551                         hdr = NULL;
552                 }
553         } while( state && retries > 0 );
554
555         if( msg->state == RMR_OK ) {                                                                    // successful send
556                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
557                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
558                 } else {
559                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
560                         return NULL;
561                 }
562         } else {                                                                                        // send failed -- return original message
563                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
564                         errno = EAGAIN;
565                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
566                 } else {
567                         msg->state = RMR_ERR_SENDFAILED;
568                 }
569
570                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
571         }
572
573         return msg;
574 }
575
576 /*
577         send message with maximum timeout.
578         Accept a message and send it to an endpoint based on message type.
579         If NNG reports that the send attempt timed out, or should be retried,
580         RMr will retry for approximately max_to microseconds; rounded to the next
581         higher value of 10.
582
583         Allocates a new message buffer for the next send. If a message type has
584         more than one group of endpoints defined, then the message will be sent
585         in round robin fashion to one endpoint in each group.
586
587         An endpoint will be looked up in the route table using the message type and
588         the subscription id. If the subscription id is "UNSET_SUBID", then only the
589         message type is used.  If the initial lookup, with a subid, fails, then a
590         second lookup using just the mtype is tried.
591
592         When msg->state is not OK, this function must set tp_state in the message as 
593         some API fucntions return the message directly and do not propigate errno into 
594         the message.
595
596         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
597                 it will return with an error and errno set to eagain. If the send is
598                 a limited fanout, then the returned status is the status of the last
599                 send attempt.
600
601 */
602 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
603         endpoint_t*     ep;                                     // end point that we're attempting to send to
604         rtable_ent_t*   rte;                    // the route table entry which matches the message key
605         int     nn_sock;                                        // endpoint socket (fd in si case) for send
606         uta_ctx_t*      ctx;
607         int                     group;                          // selected group to get socket for
608         int                     send_again;                     // true if the message must be sent again
609         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
610         int                     sock_ok;                        // got a valid socket from round robin select
611         char*           d1;
612         int                     ok_sends = 0;           // track number of ok sends
613
614         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
615                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
616                 if( msg != NULL ) {
617                         msg->state = RMR_ERR_BADARG;
618                         errno = EINVAL;                                                                                 // must ensure it's not eagain
619                         msg->tp_state = errno;
620                 }
621                 return msg;
622         }
623
624         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
625         if( msg->header == NULL ) {
626                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
627                 msg->state = RMR_ERR_NOHDR;
628                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
629                 msg->tp_state = errno;
630                 return msg;
631         }
632
633         if( max_to < 0 ) {
634                 max_to = ctx->send_retries;             // convert to retries
635         }
636
637         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
638                 if( ctx->flags & CTXFL_WARN ) {
639                         fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
640                 }
641                 msg->state = RMR_ERR_NOENDPT;
642                 errno = ENXIO;                                                                          // must ensure it's not eagain
643                 msg->tp_state = errno;
644                 return msg;                                                                                     // caller can resend (maybe) or free
645         }
646
647         send_again = 1;                                                                                 // force loop entry
648         group = 0;                                                                                              // always start with group 0
649         while( send_again ) {
650                 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
651
652                 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
653                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
654
655                 group++;
656
657                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
658                         if( send_again ) {
659                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
660                                 if( clone_m == NULL ) {
661                                         msg->state = RMR_ERR_SENDFAILED;
662                                         errno = ENOMEM;
663                                         msg->tp_state = errno;
664                                         if( ctx->flags & CTXFL_WARN ) {
665                                                 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
666                                         }
667                                         return msg;
668                                 }
669
670                                 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
671                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
672                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
673         
674                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
675                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
676                                         msg = clone_m;
677                                 } else {
678                                         ok_sends++;
679                                         msg = clone_m;                                                                          // clone will be the next to send
680                                 }
681                         } else {
682                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
683                                 if( DEBUG ) {
684                                         if( msg == NULL ) {
685                                                 fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
686                                         }
687                                 }
688                         }
689
690                         if( ep != NULL && msg != NULL ) {
691                                 switch( msg->state ) {
692                                         case RMR_OK:
693                                                 ep->scounts[EPSC_GOOD]++;
694                                                 break;
695                                 
696                                         case RMR_ERR_RETRY:
697                                                 ep->scounts[EPSC_TRANS]++;
698                                                 break;
699
700                                         default:
701                                                 ep->scounts[EPSC_FAIL]++;
702                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
703                                                 break;
704                                 }
705                         }
706                 } else {
707 /*
708                         if( ctx->flags & CTXFL_WARN ) {
709                                 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
710                         }
711 */
712                         msg->state = RMR_ERR_NOENDPT;
713                         errno = ENXIO;
714                 }
715         }
716
717         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
718                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
719                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
720                         msg->state = RMR_OK;
721                 }
722         
723                 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
724         
725                 msg->tp_state = errno;
726         }
727
728         return msg;                                                                     // last message caries the status of last/only send attempt
729 }
730
731
732 /*
733         A generic wrapper to the real send to keep wormhole stuff agnostic.
734         We assume the wormhole function vetted the buffer so we don't have to.
735 */
736 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
737         return send_msg( ctx, msg, ep->nn_sock, -1 );
738 }
739
740 #endif