Fix msg init bug when pulling msg from the poo2
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_n( char *p, char* label, int n ) {
35         int i;
36         int j;
37         int t = 0;
38         int     rows;
39
40
41         if( label ) {
42                 fprintf( stderr, ">>>>> %s p=%p %d bytes\n", label, p, n );
43         }
44         
45         rows = (n/16) + ((n % 16) ? 1 : 0);
46         
47         for( j = 0; j < rows; j++ ) {
48                 fprintf( stderr, "%04x: ", j * 16 );
49
50                 for( i = 0; t < n && i < 16; i++, t++ ) {
51                         fprintf( stderr, "%02x ", (unsigned char) *p );
52                         p++;
53                 }
54                 fprintf( stderr, "\n" );
55         }
56 }
57
58 /*
59         backwards compatability.
60 */
61 static void dump_40( char *p, char* label ) {
62         dump_n( p, label, 40 ); 
63 }
64
65 /*
66         Translates the nng state passed in to one of ours that is suitable to put
67         into the message, and sets errno to something that might be useful.
68         If we don't have a specific RMr state, then we return the default (e.g.
69         receive failed).
70
71         The addition of the connection shut error code to the switch requires
72         that the NNG version at commit e618abf8f3db2a94269a (or after) be
73         used for compiling RMR. 
74 */
75 static inline int xlate_si_state( int state, int def_state ) {
76
77         switch( state ) {
78                 case SI_ERR_NONE:
79                         errno = 0;
80                         state = RMR_OK;
81                         break;
82
83                 default:
84                         state = def_state;
85                         errno = EAGAIN;
86                         break;
87
88                 case SI_ERR_QUEUED:
89                 case SI_ERR_TPORT:
90                 case SI_ERR_UPORT:
91                 case SI_ERR_FORK:
92                 case SI_ERR_HANDLE:
93                 case SI_ERR_SESSID:
94                 case SI_ERR_TP:
95                 case SI_ERR_SHUTD:
96                 case SI_ERR_NOFDS:
97                 case SI_ERR_SIGUSR1:
98                 case SI_ERR_SIGUSR2:
99                 case SI_ERR_DISC:
100                 case SI_ERR_TIMEOUT:
101                 case SI_ERR_ORDREL:
102                 case SI_ERR_SIGALRM:
103                 case SI_ERR_NOMEM:
104                 case SI_ERR_ADDR:
105                         errno  = ENOTSUP;
106                         state = def_state;
107                         break;
108         }
109
110         return state;
111 }
112
113 /*
114         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
115         a new message struct as well. Size is the size of the zc buffer to allocate (not
116         including our header). If size is 0, then the buffer allocated is the size previously
117         allocated (if msg is !nil) or the default size given at initialisation).
118
119         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
120         the context value is used.
121
122         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
123                 are zero copy, however ONLY the message is zero copy. We now allocate and use
124                 nng messages.
125 */
126 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
127         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
128         uta_mhdr_t*     hdr;                    // convenience pointer
129         int                     tr_len;                 // trace data len (default or override)
130         int*            alen;                   // convenience pointer to set allocated len
131
132         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
133
134         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
135         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
136         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
137
138         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
139                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
140                 if( msg == NULL ) {
141                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
142                         return NULL;                                                            // we used to exit -- that seems wrong
143                 }
144                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
145         } else {                                                                // user message or message from the ring
146                 if( mlen > msg->alloc_len ) {           // current allocation is too small
147                         msg->alloc_len = 0;                             // force tp_buffer realloc below
148                         if( msg->tp_buf ) {
149                                 free( msg->tp_buf );
150                                 msg->tp_buf = NULL;
151                         }
152                 } else {
153                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
154                 }
155         }
156
157         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
158
159         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
160                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
161                 abort( );                                                                                       // toss out a core file for this
162         }
163
164 #ifdef DEBUG
165         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
166         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN );                // NOT for production -- debugging eyecatcher
167 #endif
168
169         alen = (int *) msg->tp_buf;
170         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
171
172         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
173         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
174         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
175                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
176                 hdr->sub_id = htonl( UNSET_SUBID );
177                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
178                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
179                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
180                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
181         }
182         msg->len = 0;                                                                                   // length of data in the payload
183         msg->cookie = 0x4942;
184         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
185         msg->sub_id = UNSET_SUBID;
186         msg->mtype = UNSET_MSGTYPE;
187         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
188         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
189         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
190         msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
191         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
192         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
193         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
194
195         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
196
197         return msg;
198 }
199
200 /*
201         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
202         transport receive should allocate that on its own.
203 */
204 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
205         size_t  mlen;
206         uta_mhdr_t* hdr;                        // convenience pointer
207         rmr_mbuf_t* msg;
208
209         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
210                 if( msg->tp_buf ) {
211                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
212                 }
213         } else {
214                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
215                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
216                         return NULL;                                                    // this used to exit, but that seems wrong
217                 }
218         }
219
220         memset( msg, 0, sizeof( *msg ) );
221
222         msg->cookie = 0x4942;
223         msg->sub_id = UNSET_SUBID;
224         msg->mtype = UNSET_MSGTYPE;
225         msg->tp_buf = NULL;
226         msg->header = NULL;
227         msg->len = -1;                                                                                  // no payload; invalid len
228         msg->alloc_len = 0;
229         msg->payload = NULL;
230         msg->xaction = NULL;
231         msg->state = state;
232         msg->flags = 0;
233         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
234
235         return msg;
236 }
237
238 /*
239         This accepts a message with the assumption that only the tp_buf pointer is valid. It
240         sets all of the various header/payload/xaction pointers in the mbuf to the proper
241         spot in the transport layer buffer.  The len in the header is assumed to be the
242         allocated len (a receive buffer that nng created);
243
244         The alen parm is the assumed allocated length; assumed because it's a value likely
245         to have come from si receive and the actual alloc len might be larger, but we
246         can only assume this is the total usable space. Because we are managing a transport
247         header in the first n bytes of the real msg, we must adjust this length down by the
248         size of the tp header (for testing 50 bytes, but this should change to a struct if
249         we adopt this interface).
250
251         This function returns the message with an error state set if it detects that the
252         received message might have been truncated.  Check is done here as the calculation
253         is somewhat based on header version.
254 */
255 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
256         uta_mhdr_t* hdr = NULL;                 // current header
257         uta_v1mhdr_t* v1hdr;                    // version 1 header
258         int ver;
259         int     hlen;                                           // header len to use for a truncation check
260
261         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
262
263         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
264
265         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
266                 ver = 1;
267                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
268         } else {
269                 ver = ntohl( v1hdr->rmr_ver );
270         }
271
272         switch( ver ) {
273                 case 1:
274                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
275                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
276                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
277
278                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
279                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
280                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
281                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
282                         msg->state = RMR_OK;
283                         hlen = sizeof( uta_v1mhdr_t );
284                         break;
285
286                 default:                                                                                                        // current version always lands here
287                         hdr = (uta_mhdr_t *) msg->header;
288                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
289                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
290
291                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
292                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
293                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
294                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
295                         msg->sub_id = ntohl( hdr->sub_id );
296                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
297                         break;
298         }
299
300         if( msg->len > (msg->alloc_len - hlen ) ) {
301                 msg->state = RMR_ERR_TRUNC;
302                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
303         } else {
304                 msg->state = RMR_OK;
305         }
306 }
307
308 /*
309         This will clone a message into a new zero copy buffer and return the cloned message.
310 */
311 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
312         rmr_mbuf_t* nm;                 // new message buffer
313         size_t  mlen;
314         int state;
315         uta_mhdr_t* hdr;
316         uta_v1mhdr_t* v1hdr;
317
318         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
319         if( nm == NULL ) {
320                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
321                 exit( 1 );
322         }
323         memset( nm, 0, sizeof( *nm ) );
324
325         mlen = old_msg->alloc_len;                                                                              // length allocated before
326         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
327                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
328                 abort();
329         }
330
331         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
332         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
333         switch( ntohl( v1hdr->rmr_ver ) ) {
334                 case 1:
335                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
336                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
337                         break;
338
339                 default:                                                                                        // current message always caught  here
340                         hdr = nm->header;
341                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
342                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
343                         break;
344         }
345
346         // --- these are all version agnostic -----------------------------------
347         nm->mtype = old_msg->mtype;
348         nm->sub_id = old_msg->sub_id;
349         nm->len = old_msg->len;                                                                 // length of data in the payload
350         nm->alloc_len = mlen;                                                                   // length of allocated payload
351
352         nm->xaction = &hdr->xid[0];                                                             // reference xaction
353         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
354         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
355         memcpy( nm->payload, old_msg->payload, old_msg->len );
356
357         return nm;
358 }
359
360 /*
361         This will clone a message with a change to the trace area in the header such that
362         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
363         The orignal message will be left unchanged, and a pointer to the new message is returned.
364         It is not possible to realloc buffers and change the data sizes.
365 */
366 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
367         rmr_mbuf_t* nm;                 // new message buffer
368         size_t  mlen;
369         int state;
370         uta_mhdr_t* hdr;
371         uta_v1mhdr_t* v1hdr;
372         int     tr_old_len;                     // tr size in new buffer
373         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
374         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
375
376         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
377         if( nm == NULL ) {
378                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
379                 exit( 1 );
380         }
381         memset( nm, 0, sizeof( *nm ) );
382
383         hdr = old_msg->header;
384         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
385
386         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
387         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
388
389         tpb_len = mlen + TP_HDR_LEN;
390         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
391                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
392                 exit( 1 );
393         }
394         memset( nm->tp_buf, 0, tpb_len );
395         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
396         alen = (int *) nm->tp_buf;
397         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
398
399         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
400
401         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
402         switch( ntohl( v1hdr->rmr_ver ) ) {
403                 case 1:
404                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
405                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
406                         break;
407
408                 default:                                                                                        // current message version always caught  here
409                         hdr = nm->header;
410                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
411                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
412
413                         if( RMR_D1_LEN( hdr )  ) {
414                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
415                         }
416                         if( RMR_D2_LEN( hdr )  ) {
417                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
418                         }
419
420                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
421                         break;
422         }
423
424         // --- these are all version agnostic -----------------------------------
425         nm->mtype = old_msg->mtype;
426         nm->sub_id = old_msg->sub_id;
427         nm->len = old_msg->len;                                                                 // length of data in the payload
428         nm->alloc_len = mlen;                                                                   // length of allocated payload
429
430         nm->xaction = &hdr->xid[0];                                                             // reference xaction
431         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
432         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
433         memcpy( nm->payload, old_msg->payload, old_msg->len );
434
435         return nm;
436 }
437
438 /*
439         Realloc the message such that the payload is at least payload_len bytes.  
440         The clone and copy options affect what portion of the original payload is copied to
441         the reallocated message, and whether or not the original payload is lost after the
442         reallocation process has finished.
443
444                 copy == true
445                 The entire payload from the original message will be coppied to the reallocated
446                 payload.
447
448                 copy == false
449                 Only the header (preserving return to sender information, message type, etc)
450                 is preserved after reallocation; the payload used lengrh is set to 0 and the
451                 payload is NOT initialised/cleared.
452
453                 clone == true
454                 The orignal message is preserved and a completely new message buffer and payload
455                 are allocated (even if the size given is the same). A pointer to the new message
456                 buffer is returned and it is the user application's responsibility to manage the
457                 old buffer (e.g. free when not needed).
458
459                 clone == false
460                 The old payload will be lost after reallocation. The message buffer pointer which
461                 is returned will likely reference the same structure (don't depend on that).
462                 
463
464         CAUTION:
465         If the message is not a message which was received, the mtype, sub-id, length values in the
466         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
467         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
468         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
469         settings and NOT from the copied RMR header in transport buffer.
470 */
471 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
472         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
473         size_t  mlen;
474         uta_mhdr_t* omhdr;              // old message header
475         int             tr_old_len;             // tr size in new buffer
476         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
477         int             hdr_len = 0;    // length of RMR and transport headers in old msg
478         void*   old_tp_buf;             // pointer to the old tp buffer
479         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
480         int             old_mt;                 // msg type and sub-id from the message passed in
481         int             old_sid;
482         int             old_len;
483         int             old_rfd;                // rts file descriptor from old message
484
485         if( old_msg == NULL || payload_len <= 0 ) {
486                 errno = EINVAL;
487                 return NULL;
488         }
489
490         old_mt = old_msg->mtype;                        // preserve mbuf info
491         old_sid = old_msg->sub_id;
492         old_len = old_msg->len;
493         old_rfd = old_msg->rts_fd;
494
495         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
496
497         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
498                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
499                 return old_msg;
500         }
501
502         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
503         old_tp_buf = old_msg->tp_buf;
504
505         if( clone ) {
506                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
507                 free_tp = 0;
508
509                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
510                 if( nm == NULL ) {
511                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
512                         return NULL;
513                 }
514                 memset( nm, 0, sizeof( *nm ) );
515                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
516         } else {
517                 nm = old_msg;
518         }
519
520         omhdr = old_msg->header;
521         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
522
523         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
524         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
525                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
526                 free( nm );
527                 return NULL;
528         }
529
530         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
531         SET_HDR_LEN( nm->header );
532
533         if( copy ) {                                                                                                                            // if we need to copy the old payload too
534                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
535                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
536         } else {                                                                                                                                        // just need to copy header
537                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
538                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
539         }
540
541         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
542
543         if( !copy ) {
544                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
545                 nm->sub_id = -1;
546                 nm->len = 0;                                            // and len is 0
547         } else {
548                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
549                 nm->mtype = old_mt;
550                 nm->sub_id = old_sid;
551         }
552
553         if( free_tp ) {
554                 free( old_tp_buf );                             // we did not clone, so free b/c no references
555         }
556
557         return nm;
558 }
559
560 /*
561         For SI95 based transport all receives are driven through the threaded
562         ring and thus this function should NOT be called. If it is we will panic
563         and abort straight away.
564 */
565 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
566
567 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
568 exit( 1 );
569
570         return NULL;
571 }
572
573 /*
574         This does the hard work of actually sending the message to the given socket. On success,
575         a new message struct is returned. On error, the original msg is returned with the state
576         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
577         buffer will not be allocated and returned (mostly for call() interal processing since
578         the return message from call() is a received buffer, not a new one).
579
580         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
581         validation has been done prior.
582
583         When msg->state is not ok, this function must set tp_state in the message as some API 
584         fucntions return the message directly and do not propigate errno into the message.
585 */
586 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
587         int state;
588         uta_mhdr_t*     hdr;
589         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
590         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
591         int tot_len;                                                    // total send length (hdr + user data + tp header)
592
593         // future: ensure that application did not overrun the XID buffer; last byte must be 0
594
595         hdr = (uta_mhdr_t *) msg->header;
596         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
597         hdr->sub_id = htonl( msg->sub_id );
598         hdr->plen = htonl( msg->len );
599         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
600
601         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
602                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
603                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
604         }
605
606         if( retries == 0 ) {
607                 spin_retries = 100;
608                 retries++;
609         }
610
611         errno = 0;
612         msg->state = RMR_OK;
613         do {
614                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
615                 if( tot_len > msg->alloc_len ) {
616                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
617                 }
618                 *((int*) msg->tp_buf) = tot_len;
619
620                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
621                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
622
623                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
624                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
625                         msg->state = state;
626                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
627                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
628                                         retries--;
629                                         if( retries > 0 ) {                                     // only if we'll loop through again
630                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
631                                         }
632                                         spin_retries = 1000;
633                                 }
634                         } else {
635                                 state = 0;                      // don't loop
636                         }
637                 } else {
638                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
639                         state = 0;
640                         msg->state = RMR_OK;
641                         hdr = NULL;
642                 }
643         } while( state && retries > 0 );
644
645         if( msg->state == RMR_OK ) {                                                                    // successful send
646                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
647                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
648                 } else {
649                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
650                         return NULL;
651                 }
652         } else {                                                                                        // send failed -- return original message
653                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
654                         errno = EAGAIN;
655                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
656                 } else {
657                         msg->state = RMR_ERR_SENDFAILED;
658                 }
659
660                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
661         }
662
663         return msg;
664 }
665
666 /*
667         send message with maximum timeout.
668         Accept a message and send it to an endpoint based on message type.
669         If NNG reports that the send attempt timed out, or should be retried,
670         RMr will retry for approximately max_to microseconds; rounded to the next
671         higher value of 10.
672
673         Allocates a new message buffer for the next send. If a message type has
674         more than one group of endpoints defined, then the message will be sent
675         in round robin fashion to one endpoint in each group.
676
677         An endpoint will be looked up in the route table using the message type and
678         the subscription id. If the subscription id is "UNSET_SUBID", then only the
679         message type is used.  If the initial lookup, with a subid, fails, then a
680         second lookup using just the mtype is tried.
681
682         When msg->state is not OK, this function must set tp_state in the message as 
683         some API fucntions return the message directly and do not propigate errno into 
684         the message.
685
686         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
687                 it will return with an error and errno set to eagain. If the send is
688                 a limited fanout, then the returned status is the status of the last
689                 send attempt.
690
691 */
692 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
693         endpoint_t*     ep;                                     // end point that we're attempting to send to
694         rtable_ent_t*   rte;                    // the route table entry which matches the message key
695         int     nn_sock;                                        // endpoint socket (fd in si case) for send
696         uta_ctx_t*      ctx;
697         int                     group;                          // selected group to get socket for
698         int                     send_again;                     // true if the message must be sent again
699         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
700         int                     sock_ok;                        // got a valid socket from round robin select
701         char*           d1;
702         int                     ok_sends = 0;           // track number of ok sends
703
704         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
705                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
706                 if( msg != NULL ) {
707                         msg->state = RMR_ERR_BADARG;
708                         errno = EINVAL;                                                                                 // must ensure it's not eagain
709                         msg->tp_state = errno;
710                 }
711                 return msg;
712         }
713
714         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
715         if( msg->header == NULL ) {
716                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
717                 msg->state = RMR_ERR_NOHDR;
718                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
719                 msg->tp_state = errno;
720                 return msg;
721         }
722
723         if( max_to < 0 ) {
724                 max_to = ctx->send_retries;             // convert to retries
725         }
726
727         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
728                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
729                 msg->state = RMR_ERR_NOENDPT;
730                 errno = ENXIO;                                                                          // must ensure it's not eagain
731                 msg->tp_state = errno;
732                 return msg;                                                                                     // caller can resend (maybe) or free
733         }
734
735         send_again = 1;                                                                                 // force loop entry
736         group = 0;                                                                                              // always start with group 0
737         while( send_again ) {
738                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
739                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
740                 } else {
741                         sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
742                         send_again = 0;
743                 }
744
745                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
746                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
747
748                 group++;
749
750                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
751                         if( send_again ) {
752                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
753                                 if( clone_m == NULL ) {
754                                         msg->state = RMR_ERR_SENDFAILED;
755                                         errno = ENOMEM;
756                                         msg->tp_state = errno;
757                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
758                                         return msg;
759                                 }
760
761                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
762                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
763                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
764         
765                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
766                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
767                                         msg = clone_m;
768                                 } else {
769                                         ok_sends++;
770                                         msg = clone_m;                                                                          // clone will be the next to send
771                                 }
772                         } else {
773                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
774                                 if( DEBUG ) {
775                                         if( msg == NULL ) {
776                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
777                                         }
778                                 }
779                         }
780
781                         if( ep != NULL && msg != NULL ) {
782                                 switch( msg->state ) {
783                                         case RMR_OK:
784                                                 ep->scounts[EPSC_GOOD]++;
785                                                 break;
786                                 
787                                         case RMR_ERR_RETRY:
788                                                 ep->scounts[EPSC_TRANS]++;
789                                                 break;
790
791                                         default:
792                                                 ep->scounts[EPSC_FAIL]++;
793                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
794                                                 break;
795                                 }
796                         }
797                 } else {
798                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
799                         msg->state = RMR_ERR_NOENDPT;
800                         errno = ENXIO;
801                 }
802         }
803
804         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
805                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
806                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
807                         msg->state = RMR_OK;
808                 }
809         
810                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
811         
812                 msg->tp_state = errno;
813         }
814
815         return msg;                                                                     // last message caries the status of last/only send attempt
816 }
817
818
819 /*
820         A generic wrapper to the real send to keep wormhole stuff agnostic.
821         We assume the wormhole function vetted the buffer so we don't have to.
822 */
823 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
824         return send_msg( ctx, msg, ep->nn_sock, -1 );
825 }
826
827 #endif