Add the wormhole call function
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_n( char *p, char* label, int n ) {
35         int i;
36         int j;
37         int t = 0;
38         int     rows;
39
40
41         if( label ) {
42                 fprintf( stderr, ">>>>> %s p=%p %d bytes\n", label, p, n );
43         }
44         
45         rows = (n/16) + ((n % 16) ? 1 : 0);
46         
47         for( j = 0; j < rows; j++ ) {
48                 fprintf( stderr, "%04x: ", j * 16 );
49
50                 for( i = 0; t < n && i < 16; i++, t++ ) {
51                         fprintf( stderr, "%02x ", (unsigned char) *p );
52                         p++;
53                 }
54                 fprintf( stderr, "\n" );
55         }
56 }
57
58 /*
59         backwards compatability.
60 */
61 static void dump_40( char *p, char* label ) {
62         dump_n( p, label, 40 ); 
63 }
64
65 /*
66         Translates the nng state passed in to one of ours that is suitable to put
67         into the message, and sets errno to something that might be useful.
68         If we don't have a specific RMr state, then we return the default (e.g.
69         receive failed).
70
71         The addition of the connection shut error code to the switch requires
72         that the NNG version at commit e618abf8f3db2a94269a (or after) be
73         used for compiling RMR. 
74 */
75 static inline int xlate_si_state( int state, int def_state ) {
76
77         switch( state ) {
78                 case SI_ERR_NONE:
79                         errno = 0;
80                         state = RMR_OK;
81                         break;
82
83                 default:
84                         state = def_state;
85                         errno = EAGAIN;
86                         break;
87
88                 case SI_ERR_QUEUED:
89                 case SI_ERR_TPORT:
90                 case SI_ERR_UPORT:
91                 case SI_ERR_FORK:
92                 case SI_ERR_HANDLE:
93                 case SI_ERR_SESSID:
94                 case SI_ERR_TP:
95                 case SI_ERR_SHUTD:
96                 case SI_ERR_NOFDS:
97                 case SI_ERR_SIGUSR1:
98                 case SI_ERR_SIGUSR2:
99                 case SI_ERR_DISC:
100                 case SI_ERR_TIMEOUT:
101                 case SI_ERR_ORDREL:
102                 case SI_ERR_SIGALRM:
103                 case SI_ERR_NOMEM:
104                 case SI_ERR_ADDR:
105                         errno  = ENOTSUP;
106                         state = def_state;
107                         break;
108         }
109
110         return state;
111 }
112
113 /*
114         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
115         a new message struct as well. Size is the size of the zc buffer to allocate (not
116         including our header). If size is 0, then the buffer allocated is the size previously
117         allocated (if msg is !nil) or the default size given at initialisation).
118
119         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
120         the context value is used.
121
122         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
123                 are zero copy, however ONLY the message is zero copy. We now allocate and use
124                 nng messages.
125 */
126 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
127         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
128         uta_mhdr_t*     hdr;                    // convenience pointer
129         int                     tr_len;                 // trace data len (default or override)
130         int*            alen;                   // convenience pointer to set allocated len
131
132         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
133
134         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
135         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
136         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
137
138         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
139                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
140                 if( msg == NULL ) {
141                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
142                         return NULL;                                                            // we used to exit -- that seems wrong
143                 }
144                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
145         } else {                                                                // user message or message from the ring
146                 if( mlen > msg->alloc_len ) {           // current allocation is too small
147                         msg->alloc_len = 0;                             // force tp_buffer realloc below
148                         if( msg->tp_buf ) {
149                                 free( msg->tp_buf );
150                         }
151                 } else {
152                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
153                 }
154         }
155
156         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
157
158         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
159                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
160                 abort( );                                                                                       // toss out a core file for this
161         }
162
163 /*
164         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
165         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN );                // NOT for production -- debugging eyecatcher
166 */
167         alen = (int *) msg->tp_buf;
168         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
169
170         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
171         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
172         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
173                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
174                 hdr->sub_id = htonl( UNSET_SUBID );
175                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
176                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
177                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
178                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
179         }
180         msg->len = 0;                                                                                   // length of data in the payload
181         msg->cookie = 0x4942;
182         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
183         msg->sub_id = UNSET_SUBID;
184         msg->mtype = UNSET_MSGTYPE;
185         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
186         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
187         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
188         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
189         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
190         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
191         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
192
193         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
194
195         return msg;
196 }
197
198 /*
199         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
200         transport receive should allocate that on its own.
201 */
202 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
203         size_t  mlen;
204         uta_mhdr_t* hdr;                        // convenience pointer
205         rmr_mbuf_t* msg;
206
207         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
208                 if( msg->tp_buf ) {
209                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
210                 }
211         } else {
212                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
213                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
214                         return NULL;                                                    // this used to exit, but that seems wrong
215                 }
216         }
217
218         memset( msg, 0, sizeof( *msg ) );
219
220         msg->cookie = 0x4942;
221         msg->sub_id = UNSET_SUBID;
222         msg->mtype = UNSET_MSGTYPE;
223         msg->tp_buf = NULL;
224         msg->header = NULL;
225         msg->len = -1;                                                                                  // no payload; invalid len
226         msg->alloc_len = 0;
227         msg->payload = NULL;
228         msg->xaction = NULL;
229         msg->state = state;
230         msg->flags = 0;
231         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
232
233         return msg;
234 }
235
236 /*
237         This accepts a message with the assumption that only the tp_buf pointer is valid. It
238         sets all of the various header/payload/xaction pointers in the mbuf to the proper
239         spot in the transport layer buffer.  The len in the header is assumed to be the
240         allocated len (a receive buffer that nng created);
241
242         The alen parm is the assumed allocated length; assumed because it's a value likely
243         to have come from si receive and the actual alloc len might be larger, but we
244         can only assume this is the total usable space. Because we are managing a transport
245         header in the first n bytes of the real msg, we must adjust this length down by the
246         size of the tp header (for testing 50 bytes, but this should change to a struct if
247         we adopt this interface).
248
249         This function returns the message with an error state set if it detects that the
250         received message might have been truncated.  Check is done here as the calculation
251         is somewhat based on header version.
252 */
253 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
254         uta_mhdr_t* hdr = NULL;                 // current header
255         uta_v1mhdr_t* v1hdr;                    // version 1 header
256         int ver;
257         int     hlen;                                           // header len to use for a truncation check
258
259         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
260
261         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
262
263         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
264                 ver = 1;
265                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
266         } else {
267                 ver = ntohl( v1hdr->rmr_ver );
268         }
269
270         switch( ver ) {
271                 case 1:
272                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
273                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
274                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
275
276                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
277                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
278                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
279                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
280                         msg->state = RMR_OK;
281                         hlen = sizeof( uta_v1mhdr_t );
282                         break;
283
284                 default:                                                                                                        // current version always lands here
285                         hdr = (uta_mhdr_t *) msg->header;
286                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
287                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
288
289                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
290                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
291                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
292                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
293                         msg->sub_id = ntohl( hdr->sub_id );
294                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
295                         break;
296         }
297
298         if( msg->len > (msg->alloc_len - hlen ) ) {
299                 msg->state = RMR_ERR_TRUNC;
300                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
301         } else {
302                 msg->state = RMR_OK;
303         }
304 }
305
306 /*
307         This will clone a message into a new zero copy buffer and return the cloned message.
308 */
309 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
310         rmr_mbuf_t* nm;                 // new message buffer
311         size_t  mlen;
312         int state;
313         uta_mhdr_t* hdr;
314         uta_v1mhdr_t* v1hdr;
315
316         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
317         if( nm == NULL ) {
318                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
319                 exit( 1 );
320         }
321         memset( nm, 0, sizeof( *nm ) );
322
323         mlen = old_msg->alloc_len;                                                                              // length allocated before
324         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
325                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
326                 abort();
327         }
328
329         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
330         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
331         switch( ntohl( v1hdr->rmr_ver ) ) {
332                 case 1:
333                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
334                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
335                         break;
336
337                 default:                                                                                        // current message always caught  here
338                         hdr = nm->header;
339                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
340                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
341                         break;
342         }
343
344         // --- these are all version agnostic -----------------------------------
345         nm->mtype = old_msg->mtype;
346         nm->sub_id = old_msg->sub_id;
347         nm->len = old_msg->len;                                                                 // length of data in the payload
348         nm->alloc_len = mlen;                                                                   // length of allocated payload
349
350         nm->xaction = &hdr->xid[0];                                                             // reference xaction
351         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
352         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
353         memcpy( nm->payload, old_msg->payload, old_msg->len );
354
355         return nm;
356 }
357
358 /*
359         This will clone a message with a change to the trace area in the header such that
360         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
361         The orignal message will be left unchanged, and a pointer to the new message is returned.
362         It is not possible to realloc buffers and change the data sizes.
363 */
364 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
365         rmr_mbuf_t* nm;                 // new message buffer
366         size_t  mlen;
367         int state;
368         uta_mhdr_t* hdr;
369         uta_v1mhdr_t* v1hdr;
370         int     tr_old_len;                     // tr size in new buffer
371         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
372         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
373
374         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
375         if( nm == NULL ) {
376                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
377                 exit( 1 );
378         }
379         memset( nm, 0, sizeof( *nm ) );
380
381         hdr = old_msg->header;
382         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
383
384         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
385         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
386
387         tpb_len = mlen + TP_HDR_LEN;
388         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
389                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
390                 exit( 1 );
391         }
392         memset( nm->tp_buf, 0, tpb_len );
393         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
394         alen = (int *) nm->tp_buf;
395         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
396
397         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
398
399         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
400         switch( ntohl( v1hdr->rmr_ver ) ) {
401                 case 1:
402                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
403                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
404                         break;
405
406                 default:                                                                                        // current message version always caught  here
407                         hdr = nm->header;
408                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
409                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
410
411                         if( RMR_D1_LEN( hdr )  ) {
412                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
413                         }
414                         if( RMR_D2_LEN( hdr )  ) {
415                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
416                         }
417
418                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
419                         break;
420         }
421
422         // --- these are all version agnostic -----------------------------------
423         nm->mtype = old_msg->mtype;
424         nm->sub_id = old_msg->sub_id;
425         nm->len = old_msg->len;                                                                 // length of data in the payload
426         nm->alloc_len = mlen;                                                                   // length of allocated payload
427
428         nm->xaction = &hdr->xid[0];                                                             // reference xaction
429         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
430         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
431         memcpy( nm->payload, old_msg->payload, old_msg->len );
432
433         return nm;
434 }
435
436 /*
437         Realloc the message such that the payload is at least payload_len bytes.  
438         The clone and copy options affect what portion of the original payload is copied to
439         the reallocated message, and whether or not the original payload is lost after the
440         reallocation process has finished.
441
442                 copy == true
443                 The entire payload from the original message will be coppied to the reallocated
444                 payload.
445
446                 copy == false
447                 Only the header (preserving return to sender information, message type, etc)
448                 is preserved after reallocation; the payload used lengrh is set to 0 and the
449                 payload is NOT initialised/cleared.
450
451                 clone == true
452                 The orignal message is preserved and a completely new message buffer and payload
453                 are allocated (even if the size given is the same). A pointer to the new message
454                 buffer is returned and it is the user application's responsibility to manage the
455                 old buffer (e.g. free when not needed).
456
457                 clone == false
458                 The old payload will be lost after reallocation. The message buffer pointer which
459                 is returned will likely reference the same structure (don't depend on that).
460                 
461
462         CAUTION:
463         If the message is not a message which was received, the mtype, sub-id, length values in the
464         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
465         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
466         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
467         settings and NOT from the copied RMR header in transport buffer.
468 */
469 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
470         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
471         size_t  mlen;
472         uta_mhdr_t* omhdr;              // old message header
473         int             tr_old_len;             // tr size in new buffer
474         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
475         int             hdr_len = 0;    // length of RMR and transport headers in old msg
476         void*   old_tp_buf;             // pointer to the old tp buffer
477         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
478         int             old_mt;                 // msg type and sub-id from the message passed in
479         int             old_sid;
480         int             old_len;
481         int             old_rfd;                // rts file descriptor from old message
482
483         if( old_msg == NULL || payload_len <= 0 ) {
484                 errno = EINVAL;
485                 return NULL;
486         }
487
488         old_mt = old_msg->mtype;                        // preserve mbuf info
489         old_sid = old_msg->sub_id;
490         old_len = old_msg->len;
491         old_rfd = old_msg->rts_fd;
492
493         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
494
495         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
496                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
497                 return old_msg;
498         }
499
500         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
501         old_tp_buf = old_msg->tp_buf;
502
503         if( clone ) {
504                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
505                 free_tp = 0;
506
507                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
508                 if( nm == NULL ) {
509                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
510                         return NULL;
511                 }
512                 memset( nm, 0, sizeof( *nm ) );
513                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
514         } else {
515                 nm = old_msg;
516         }
517
518         omhdr = old_msg->header;
519         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
520
521         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
522         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
523                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
524                 free( nm );
525                 return NULL;
526         }
527
528         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
529         SET_HDR_LEN( nm->header );
530
531         if( copy ) {                                                                                                                            // if we need to copy the old payload too
532                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
533                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
534         } else {                                                                                                                                        // just need to copy header
535                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
536                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
537         }
538
539         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
540
541         if( !copy ) {
542                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
543                 nm->sub_id = -1;
544                 nm->len = 0;                                            // and len is 0
545         } else {
546                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
547                 nm->mtype = old_mt;
548                 nm->sub_id = old_sid;
549         }
550
551         if( free_tp ) {
552                 free( old_tp_buf );                             // we did not clone, so free b/c no references
553         }
554
555         return nm;
556 }
557
558 /*
559         For SI95 based transport all receives are driven through the threaded
560         ring and thus this function should NOT be called. If it is we will panic
561         and abort straight away.
562 */
563 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
564
565 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
566 exit( 1 );
567
568         return NULL;
569 }
570
571 /*
572         This does the hard work of actually sending the message to the given socket. On success,
573         a new message struct is returned. On error, the original msg is returned with the state
574         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
575         buffer will not be allocated and returned (mostly for call() interal processing since
576         the return message from call() is a received buffer, not a new one).
577
578         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
579         validation has been done prior.
580
581         When msg->state is not ok, this function must set tp_state in the message as some API 
582         fucntions return the message directly and do not propigate errno into the message.
583 */
584 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
585         int state;
586         uta_mhdr_t*     hdr;
587         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
588         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
589         int tot_len;                                                    // total send length (hdr + user data + tp header)
590
591         // future: ensure that application did not overrun the XID buffer; last byte must be 0
592
593         hdr = (uta_mhdr_t *) msg->header;
594         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
595         hdr->sub_id = htonl( msg->sub_id );
596         hdr->plen = htonl( msg->len );
597         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
598
599         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
600                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
601                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
602         }
603
604         if( retries == 0 ) {
605                 spin_retries = 100;
606                 retries++;
607         }
608
609         errno = 0;
610         msg->state = RMR_OK;
611         do {
612                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
613                 if( tot_len > msg->alloc_len ) {
614                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
615                 }
616                 *((int*) msg->tp_buf) = tot_len;
617
618                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
619                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
620
621                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
622                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
623                         msg->state = state;
624                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
625                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
626                                         retries--;
627                                         if( retries > 0 ) {                                     // only if we'll loop through again
628                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
629                                         }
630                                         spin_retries = 1000;
631                                 }
632                         } else {
633                                 state = 0;                      // don't loop
634                         }
635                 } else {
636                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
637                         state = 0;
638                         msg->state = RMR_OK;
639                         hdr = NULL;
640                 }
641         } while( state && retries > 0 );
642
643         if( msg->state == RMR_OK ) {                                                                    // successful send
644                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
645                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
646                 } else {
647                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
648                         return NULL;
649                 }
650         } else {                                                                                        // send failed -- return original message
651                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
652                         errno = EAGAIN;
653                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
654                 } else {
655                         msg->state = RMR_ERR_SENDFAILED;
656                 }
657
658                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
659         }
660
661         return msg;
662 }
663
664 /*
665         send message with maximum timeout.
666         Accept a message and send it to an endpoint based on message type.
667         If NNG reports that the send attempt timed out, or should be retried,
668         RMr will retry for approximately max_to microseconds; rounded to the next
669         higher value of 10.
670
671         Allocates a new message buffer for the next send. If a message type has
672         more than one group of endpoints defined, then the message will be sent
673         in round robin fashion to one endpoint in each group.
674
675         An endpoint will be looked up in the route table using the message type and
676         the subscription id. If the subscription id is "UNSET_SUBID", then only the
677         message type is used.  If the initial lookup, with a subid, fails, then a
678         second lookup using just the mtype is tried.
679
680         When msg->state is not OK, this function must set tp_state in the message as 
681         some API fucntions return the message directly and do not propigate errno into 
682         the message.
683
684         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
685                 it will return with an error and errno set to eagain. If the send is
686                 a limited fanout, then the returned status is the status of the last
687                 send attempt.
688
689 */
690 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
691         endpoint_t*     ep;                                     // end point that we're attempting to send to
692         rtable_ent_t*   rte;                    // the route table entry which matches the message key
693         int     nn_sock;                                        // endpoint socket (fd in si case) for send
694         uta_ctx_t*      ctx;
695         int                     group;                          // selected group to get socket for
696         int                     send_again;                     // true if the message must be sent again
697         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
698         int                     sock_ok;                        // got a valid socket from round robin select
699         char*           d1;
700         int                     ok_sends = 0;           // track number of ok sends
701
702         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
703                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
704                 if( msg != NULL ) {
705                         msg->state = RMR_ERR_BADARG;
706                         errno = EINVAL;                                                                                 // must ensure it's not eagain
707                         msg->tp_state = errno;
708                 }
709                 return msg;
710         }
711
712         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
713         if( msg->header == NULL ) {
714                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
715                 msg->state = RMR_ERR_NOHDR;
716                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
717                 msg->tp_state = errno;
718                 return msg;
719         }
720
721         if( max_to < 0 ) {
722                 max_to = ctx->send_retries;             // convert to retries
723         }
724
725         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
726                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
727                 msg->state = RMR_ERR_NOENDPT;
728                 errno = ENXIO;                                                                          // must ensure it's not eagain
729                 msg->tp_state = errno;
730                 return msg;                                                                                     // caller can resend (maybe) or free
731         }
732
733         send_again = 1;                                                                                 // force loop entry
734         group = 0;                                                                                              // always start with group 0
735         while( send_again ) {
736                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
737                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
738                 } else {
739                         sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
740                         send_again = 0;
741                 }
742
743                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
744                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
745
746                 group++;
747
748                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
749                         if( send_again ) {
750                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
751                                 if( clone_m == NULL ) {
752                                         msg->state = RMR_ERR_SENDFAILED;
753                                         errno = ENOMEM;
754                                         msg->tp_state = errno;
755                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
756                                         return msg;
757                                 }
758
759                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
760                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
761                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
762         
763                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
764                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
765                                         msg = clone_m;
766                                 } else {
767                                         ok_sends++;
768                                         msg = clone_m;                                                                          // clone will be the next to send
769                                 }
770                         } else {
771                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
772                                 if( DEBUG ) {
773                                         if( msg == NULL ) {
774                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
775                                         }
776                                 }
777                         }
778
779                         if( ep != NULL && msg != NULL ) {
780                                 switch( msg->state ) {
781                                         case RMR_OK:
782                                                 ep->scounts[EPSC_GOOD]++;
783                                                 break;
784                                 
785                                         case RMR_ERR_RETRY:
786                                                 ep->scounts[EPSC_TRANS]++;
787                                                 break;
788
789                                         default:
790                                                 ep->scounts[EPSC_FAIL]++;
791                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
792                                                 break;
793                                 }
794                         }
795                 } else {
796                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
797                         msg->state = RMR_ERR_NOENDPT;
798                         errno = ENXIO;
799                 }
800         }
801
802         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
803                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
804                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
805                         msg->state = RMR_OK;
806                 }
807         
808                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
809         
810                 msg->tp_state = errno;
811         }
812
813         return msg;                                                                     // last message caries the status of last/only send attempt
814 }
815
816
817 /*
818         A generic wrapper to the real send to keep wormhole stuff agnostic.
819         We assume the wormhole function vetted the buffer so we don't have to.
820 */
821 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
822         return send_msg( ctx, msg, ep->nn_sock, -1 );
823 }
824
825 #endif