Add dynamic route table update to SI95 support
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs nanomsg).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 #include <nng/nng.h>
35 #include <nng/protocol/pubsub0/pub.h>
36 #include <nng/protocol/pubsub0/sub.h>
37 #include <nng/protocol/pipeline0/push.h>
38 #include <nng/protocol/pipeline0/pull.h>
39
40
41 static void dump_40( char *p, char* label ) {
42         int i;
43
44         if( label )
45                 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
46
47         for( i = 0; i < 40; i++ ) {
48                 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
49         }
50         fprintf( stderr, "\n" );
51 }
52
53 /*
54         Translates the nng state passed in to one of ours that is suitable to put
55         into the message, and sets errno to something that might be useful.
56         If we don't have a specific RMr state, then we return the default (e.g.
57         receive failed).
58
59         The addition of the connection shut error code to the switch requires
60         that the NNG version at commit e618abf8f3db2a94269a (or after) be
61         used for compiling RMR. 
62 */
63 static inline int xlate_si_state( int state, int def_state ) {
64
65         switch( state ) {
66                 case SI_ERR_NONE:
67                         errno = 0;
68                         state = RMR_OK;
69                         break;
70
71                 default:
72                         state = def_state;
73                         errno = EAGAIN;
74                         break;
75
76                 case SI_ERR_QUEUED:
77                 case SI_ERR_TPORT:
78                 case SI_ERR_UPORT:
79                 case SI_ERR_FORK:
80                 case SI_ERR_HANDLE:
81                 case SI_ERR_SESSID:
82                 case SI_ERR_TP:
83                 case SI_ERR_SHUTD:
84                 case SI_ERR_NOFDS:
85                 case SI_ERR_SIGUSR1:
86                 case SI_ERR_SIGUSR2:
87                 case SI_ERR_DISC:
88                 case SI_ERR_TIMEOUT:
89                 case SI_ERR_ORDREL:
90                 case SI_ERR_SIGALRM:
91                 case SI_ERR_NOMEM:
92                 case SI_ERR_ADDR:
93                         errno  = ENOTSUP;
94                         state = def_state;
95                         break;
96         }
97
98         return state;
99 }
100
101 /*
102         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
103         a new message struct as well. Size is the size of the zc buffer to allocate (not
104         including our header). If size is 0, then the buffer allocated is the size previously
105         allocated (if msg is !nil) or the default size given at initialisation).
106
107         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
108         the context value is used.
109
110         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
111                 are zero copy, however ONLY the message is zero copy. We now allocate and use
112                 nng messages.
113 */
114 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
115         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
116         uta_mhdr_t*     hdr;                    // convenience pointer
117         int                     tr_len;                 // trace data len (default or override)
118         int*            alen;                   // convenience pointer to set allocated len
119
120         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
121
122         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
123         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
124         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
125
126         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
127                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
128                 if( msg == NULL ) {
129                         fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
130                         return NULL;                                                            // we used to exit -- that seems wrong
131                 }
132                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
133         } else {                                                                // user message or message from the ring
134                 if( mlen > msg->alloc_len ) {           // current allocation is too small
135                         msg->alloc_len = 0;                             // force tp_buffer realloc below
136                         if( msg->tp_buf ) {
137                                 free( msg->tp_buf );
138                         }
139                 } else {
140                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
141                 }
142         }
143
144
145         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
146                 fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
147                 abort( );                                                                                       // toss out a core file for this
148         }
149
150 /*
151         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
152         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
153 */
154         alen = (int *) msg->tp_buf;
155         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
156
157         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
158         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
159         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
160                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
161                 hdr->sub_id = htonl( UNSET_SUBID );
162                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
163                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
164                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
165                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
166         }
167         msg->len = 0;                                                                                   // length of data in the payload
168         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
169         msg->sub_id = UNSET_SUBID;
170         msg->mtype = UNSET_MSGTYPE;
171         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
172         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
173         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
174         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
175         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
176         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
177         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
178
179         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
180
181         return msg;
182 }
183
184 /*
185         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
186         transport receive should allocate that on its own.
187 */
188 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
189         size_t  mlen;
190         uta_mhdr_t* hdr;                        // convenience pointer
191         rmr_mbuf_t* msg;
192
193         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
194                 if( msg->tp_buf ) {
195                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
196                 }
197         } else {
198                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
199                         fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
200                         return NULL;                                                    // this used to exit, but that seems wrong
201                 }
202         }
203
204         memset( msg, 0, sizeof( *msg ) );
205
206         msg->sub_id = UNSET_SUBID;
207         msg->mtype = UNSET_MSGTYPE;
208         msg->tp_buf = NULL;
209         msg->header = NULL;
210         msg->len = -1;                                                                                  // no payload; invalid len
211         msg->alloc_len = 0;
212         msg->payload = NULL;
213         msg->xaction = NULL;
214         msg->state = state;
215         msg->flags = 0;
216         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
217
218         return msg;
219 }
220
221 /*
222         This accepts a message with the assumption that only the tp_buf pointer is valid. It
223         sets all of the various header/payload/xaction pointers in the mbuf to the proper
224         spot in the transport layer buffer.  The len in the header is assumed to be the
225         allocated len (a receive buffer that nng created);
226
227         The alen parm is the assumed allocated length; assumed because it's a value likely
228         to have come from si receive and the actual alloc len might be larger, but we
229         can only assume this is the total usable space. Because we are managing a transport
230         header in the first n bytes of the real msg, we must adjust this length down by the
231         size of the tp header (for testing 50 bytes, but this should change to a struct if
232         we adopt this interface).
233
234         This function returns the message with an error state set if it detects that the
235         received message might have been truncated.  Check is done here as the calculation
236         is somewhat based on header version.
237 */
238 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
239         uta_mhdr_t* hdr = NULL;                 // current header
240         uta_v1mhdr_t* v1hdr;                    // version 1 header
241         int ver;
242         int     hlen;                                           // header len to use for a truncation check
243
244         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
245
246         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
247
248         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
249                 ver = 1;
250                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
251         } else {
252                 ver = ntohl( v1hdr->rmr_ver );
253         }
254
255         switch( ver ) {
256                 case 1:
257                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
258                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
259                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
260
261                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
262                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
263                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
264                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
265                         msg->state = RMR_OK;
266                         hlen = sizeof( uta_v1mhdr_t );
267                         break;
268
269                 default:                                                                                                        // current version always lands here
270                         hdr = (uta_mhdr_t *) msg->header;
271                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
272                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
273
274                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
275                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
276                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
277                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
278                         msg->sub_id = ntohl( hdr->sub_id );
279                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
280                         break;
281         }
282
283         if( msg->len > (msg->alloc_len - hlen ) ) {
284                 msg->state = RMR_ERR_TRUNC;
285                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
286         } else {
287                 msg->state = RMR_OK;
288         }
289 }
290
291 /*
292         This will clone a message into a new zero copy buffer and return the cloned message.
293 */
294 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
295         rmr_mbuf_t* nm;                 // new message buffer
296         size_t  mlen;
297         int state;
298         uta_mhdr_t* hdr;
299         uta_v1mhdr_t* v1hdr;
300
301         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
302         if( nm == NULL ) {
303                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
304                 exit( 1 );
305         }
306         memset( nm, 0, sizeof( *nm ) );
307
308         mlen = old_msg->alloc_len;                                                                              // length allocated before
309         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
310                 fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
311                 abort();
312         }
313
314         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
315         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
316         switch( ntohl( v1hdr->rmr_ver ) ) {
317                 case 1:
318                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
319                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
320                         break;
321
322                 default:                                                                                        // current message always caught  here
323                         hdr = nm->header;
324                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
325                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
326                         break;
327         }
328
329         // --- these are all version agnostic -----------------------------------
330         nm->mtype = old_msg->mtype;
331         nm->sub_id = old_msg->sub_id;
332         nm->len = old_msg->len;                                                                 // length of data in the payload
333         nm->alloc_len = mlen;                                                                   // length of allocated payload
334
335         nm->xaction = hdr->xid;                                                                 // reference xaction
336         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
337         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
338         memcpy( nm->payload, old_msg->payload, old_msg->len );
339
340         return nm;
341 }
342
343 /*
344         This will clone a message with a change to the trace area in the header such that
345         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
346         The orignal message will be left unchanged, and a pointer to the new message is returned.
347         It is not possible to realloc buffers and change the data sizes.
348 */
349 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
350         rmr_mbuf_t* nm;                 // new message buffer
351         size_t  mlen;
352         int state;
353         uta_mhdr_t* hdr;
354         uta_v1mhdr_t* v1hdr;
355         int     tr_old_len;                     // tr size in new buffer
356         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
357         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
358
359         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
360         if( nm == NULL ) {
361                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
362                 exit( 1 );
363         }
364         memset( nm, 0, sizeof( *nm ) );
365
366         hdr = old_msg->header;
367         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
368
369         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
370         if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
371
372         tpb_len = mlen + TP_HDR_LEN;
373         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
374                 fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
375                 exit( 1 );
376         }
377         memset( nm->tp_buf, 0, tpb_len );
378         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
379         alen = (int *) nm->tp_buf;
380         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
381
382         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
383
384         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
385         switch( ntohl( v1hdr->rmr_ver ) ) {
386                 case 1:
387                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
388                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
389                         break;
390
391                 default:                                                                                        // current message version always caught  here
392                         hdr = nm->header;
393                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
394                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
395
396                         if( RMR_D1_LEN( hdr )  ) {
397                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
398                         }
399                         if( RMR_D2_LEN( hdr )  ) {
400                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
401                         }
402
403                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
404                         break;
405         }
406
407         // --- these are all version agnostic -----------------------------------
408         nm->mtype = old_msg->mtype;
409         nm->sub_id = old_msg->sub_id;
410         nm->len = old_msg->len;                                                                 // length of data in the payload
411         nm->alloc_len = mlen;                                                                   // length of allocated payload
412
413         nm->xaction = hdr->xid;                                                                 // reference xaction
414         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
415         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
416         memcpy( nm->payload, old_msg->payload, old_msg->len );
417
418         return nm;
419 }
420
421 /*
422         For SI95 based transport all receives are driven through the threaded
423         ring and thus this function should NOT be called. If it is we will panic
424         and abort straight away.
425 */
426 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
427
428 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
429 exit( 1 );
430
431         return NULL;
432 }
433
434 /*
435         Receives a 'raw' message from a non-RMr sender (no header expected). The returned
436         message buffer cannot be used to send, and the length information may or may
437         not be correct (it is set to the length received which might be more than the
438         bytes actually in the payload).
439
440         Mostly this supports the route table collector, but could be extended with an
441         API external function.
442 */
443 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
444         return NULL;
445 /*
446 FIXME: not implemented yet
447         int state;
448         rmr_mbuf_t*     msg = NULL;             // msg received
449         size_t  rsize;                          // nng needs to write back the size received... grrr
450
451         if( old_msg ) {
452                 msg = old_msg;
453         } else {
454                 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
455         }
456
457         //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
458         if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
459                 return msg;
460         }
461         rsize = nng_msg_len( msg->tp_buf );
462
463         // do NOT use ref_tpbuf() here! Must fill these in manually.
464         msg->header = nng_msg_body( msg->tp_buf );
465         msg->len = rsize;                                                       // len is the number of bytes received
466         msg->alloc_len = rsize;
467         msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
468         msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
469         msg->state = RMR_OK;
470         msg->flags = MFL_RAW;
471         msg->payload = msg->header;                                     // payload is the whole thing; no header
472         msg->xaction = NULL;
473
474         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
475
476         return msg;
477 */
478 }
479
480 /*
481         This does the hard work of actually sending the message to the given socket. On success,
482         a new message struct is returned. On error, the original msg is returned with the state
483         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
484         buffer will not be allocated and returned (mostly for call() interal processing since
485         the return message from call() is a received buffer, not a new one).
486
487         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
488         validation has been done prior.
489
490         When msg->state is not ok, this function must set tp_state in the message as some API 
491         fucntions return the message directly and do not propigate errno into the message.
492 */
493 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
494         int state;
495         uta_mhdr_t*     hdr;
496         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
497         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
498         int tot_len;                                                    // total send length (hdr + user data + tp header)
499
500         // future: ensure that application did not overrun the XID buffer; last byte must be 0
501
502         hdr = (uta_mhdr_t *) msg->header;
503         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
504         hdr->sub_id = htonl( msg->sub_id );
505         hdr->plen = htonl( msg->len );
506         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
507
508         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
509                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
510                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
511         }
512
513         if( retries == 0 ) {
514                 spin_retries = 100;
515                 retries++;
516         }
517
518         errno = 0;
519         msg->state = RMR_OK;
520         do {
521                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
522                 *((int*) msg->tp_buf) = tot_len;
523
524                 if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
525                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
526
527                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
528                         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
529                         msg->state = state;
530                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
531                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
532                                         retries--;
533                                         if( retries > 0 ) {                                     // only if we'll loop through again
534                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
535                                         }
536                                         spin_retries = 1000;
537                                 }
538                         } else {
539                                 state = 0;                      // don't loop
540                         }
541                 } else {
542                         if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
543                         state = 0;
544                         msg->state = RMR_OK;
545                         hdr = NULL;
546                 }
547         } while( state && retries > 0 );
548
549         if( msg->state == RMR_OK ) {                                                                    // successful send
550                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
551                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
552                 } else {
553                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
554                         return NULL;
555                 }
556         } else {                                                                                        // send failed -- return original message
557                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
558                         errno = EAGAIN;
559                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
560                 } else {
561                         msg->state = RMR_ERR_SENDFAILED;
562                 }
563
564                 if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
565         }
566
567         return msg;
568 }
569
570 /*
571         send message with maximum timeout.
572         Accept a message and send it to an endpoint based on message type.
573         If NNG reports that the send attempt timed out, or should be retried,
574         RMr will retry for approximately max_to microseconds; rounded to the next
575         higher value of 10.
576
577         Allocates a new message buffer for the next send. If a message type has
578         more than one group of endpoints defined, then the message will be sent
579         in round robin fashion to one endpoint in each group.
580
581         An endpoint will be looked up in the route table using the message type and
582         the subscription id. If the subscription id is "UNSET_SUBID", then only the
583         message type is used.  If the initial lookup, with a subid, fails, then a
584         second lookup using just the mtype is tried.
585
586         When msg->state is not OK, this function must set tp_state in the message as 
587         some API fucntions return the message directly and do not propigate errno into 
588         the message.
589
590         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
591                 it will return with an error and errno set to eagain. If the send is
592                 a limited fanout, then the returned status is the status of the last
593                 send attempt.
594
595 */
596 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
597         endpoint_t*     ep;                                     // end point that we're attempting to send to
598         rtable_ent_t*   rte;                    // the route table entry which matches the message key
599         int     nn_sock;                                        // endpoint socket (fd in si case) for send
600         uta_ctx_t*      ctx;
601         int                     group;                          // selected group to get socket for
602         int                     send_again;                     // true if the message must be sent again
603         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
604         int                     sock_ok;                        // got a valid socket from round robin select
605         char*           d1;
606         int                     ok_sends = 0;           // track number of ok sends
607
608         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
609                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
610                 if( msg != NULL ) {
611                         msg->state = RMR_ERR_BADARG;
612                         errno = EINVAL;                                                                                 // must ensure it's not eagain
613                         msg->tp_state = errno;
614                 }
615                 return msg;
616         }
617
618         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
619         if( msg->header == NULL ) {
620                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
621                 msg->state = RMR_ERR_NOHDR;
622                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
623                 msg->tp_state = errno;
624                 return msg;
625         }
626
627         if( max_to < 0 ) {
628                 max_to = ctx->send_retries;             // convert to retries
629         }
630
631         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
632                 if( ctx->flags & CTXFL_WARN ) {
633                         fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
634                 }
635                 msg->state = RMR_ERR_NOENDPT;
636                 errno = ENXIO;                                                                          // must ensure it's not eagain
637                 msg->tp_state = errno;
638                 return msg;                                                                                     // caller can resend (maybe) or free
639         }
640
641         send_again = 1;                                                                                 // force loop entry
642         group = 0;                                                                                              // always start with group 0
643         while( send_again ) {
644                 sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
645
646                 if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
647                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
648
649                 group++;
650
651                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
652                         if( send_again ) {
653                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
654                                 if( clone_m == NULL ) {
655                                         msg->state = RMR_ERR_SENDFAILED;
656                                         errno = ENOMEM;
657                                         msg->tp_state = errno;
658                                         if( ctx->flags & CTXFL_WARN ) {
659                                                 fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
660                                         }
661                                         return msg;
662                                 }
663
664                                 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
665                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
666                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
667         
668                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
669                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
670                                         msg = clone_m;
671                                 } else {
672                                         ok_sends++;
673                                         msg = clone_m;                                                                          // clone will be the next to send
674                                 }
675                         } else {
676                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
677                                 if( DEBUG ) {
678                                         if( msg == NULL ) {
679                                                 fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
680                                         }
681                                 }
682                         }
683
684                         if( ep != NULL && msg != NULL ) {
685                                 switch( msg->state ) {
686                                         case RMR_OK:
687                                                 ep->scounts[EPSC_GOOD]++;
688                                                 break;
689                                 
690                                         case RMR_ERR_RETRY:
691                                                 ep->scounts[EPSC_TRANS]++;
692                                                 break;
693
694                                         default:
695                                                 ep->scounts[EPSC_FAIL]++;
696                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
697                                                 break;
698                                 }
699                         }
700                 } else {
701 /*
702                         if( ctx->flags & CTXFL_WARN ) {
703                                 fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
704                         }
705 */
706                         msg->state = RMR_ERR_NOENDPT;
707                         errno = ENXIO;
708                 }
709         }
710
711         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
712                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
713                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
714                         msg->state = RMR_OK;
715                 }
716         
717                 if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
718         
719                 msg->tp_state = errno;
720         }
721
722         return msg;                                                                     // last message caries the status of last/only send attempt
723 }
724
725
726 /*
727         A generic wrapper to the real send to keep wormhole stuff agnostic.
728         We assume the wormhole function vetted the buffer so we don't have to.
729 */
730 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
731         return send_msg( ctx, msg, ep->nn_sock, -1 );
732 }
733
734 #endif