Address code analysis issues
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_40( char *p, char* label ) {
35         int i;
36
37         if( label )
38                 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
39
40         for( i = 0; i < 40; i++ ) {
41                 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
42         }
43         fprintf( stderr, "\n" );
44 }
45
46 /*
47         Translates the nng state passed in to one of ours that is suitable to put
48         into the message, and sets errno to something that might be useful.
49         If we don't have a specific RMr state, then we return the default (e.g.
50         receive failed).
51
52         The addition of the connection shut error code to the switch requires
53         that the NNG version at commit e618abf8f3db2a94269a (or after) be
54         used for compiling RMR. 
55 */
56 static inline int xlate_si_state( int state, int def_state ) {
57
58         switch( state ) {
59                 case SI_ERR_NONE:
60                         errno = 0;
61                         state = RMR_OK;
62                         break;
63
64                 default:
65                         state = def_state;
66                         errno = EAGAIN;
67                         break;
68
69                 case SI_ERR_QUEUED:
70                 case SI_ERR_TPORT:
71                 case SI_ERR_UPORT:
72                 case SI_ERR_FORK:
73                 case SI_ERR_HANDLE:
74                 case SI_ERR_SESSID:
75                 case SI_ERR_TP:
76                 case SI_ERR_SHUTD:
77                 case SI_ERR_NOFDS:
78                 case SI_ERR_SIGUSR1:
79                 case SI_ERR_SIGUSR2:
80                 case SI_ERR_DISC:
81                 case SI_ERR_TIMEOUT:
82                 case SI_ERR_ORDREL:
83                 case SI_ERR_SIGALRM:
84                 case SI_ERR_NOMEM:
85                 case SI_ERR_ADDR:
86                         errno  = ENOTSUP;
87                         state = def_state;
88                         break;
89         }
90
91         return state;
92 }
93
94 /*
95         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
96         a new message struct as well. Size is the size of the zc buffer to allocate (not
97         including our header). If size is 0, then the buffer allocated is the size previously
98         allocated (if msg is !nil) or the default size given at initialisation).
99
100         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
101         the context value is used.
102
103         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
104                 are zero copy, however ONLY the message is zero copy. We now allocate and use
105                 nng messages.
106 */
107 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
108         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
109         uta_mhdr_t*     hdr;                    // convenience pointer
110         int                     tr_len;                 // trace data len (default or override)
111         int*            alen;                   // convenience pointer to set allocated len
112
113         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
114
115         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
116         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
117         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
118
119         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
120                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
121                 if( msg == NULL ) {
122                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
123                         return NULL;                                                            // we used to exit -- that seems wrong
124                 }
125                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
126         } else {                                                                // user message or message from the ring
127                 if( mlen > msg->alloc_len ) {           // current allocation is too small
128                         msg->alloc_len = 0;                             // force tp_buffer realloc below
129                         if( msg->tp_buf ) {
130                                 free( msg->tp_buf );
131                         }
132                 } else {
133                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
134                 }
135         }
136
137         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
138
139         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
140                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
141                 abort( );                                                                                       // toss out a core file for this
142         }
143
144 /*
145         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
146         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN );                // NOT for production -- debugging eyecatcher
147 */
148         alen = (int *) msg->tp_buf;
149         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
150
151         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
152         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
153         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
154                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
155                 hdr->sub_id = htonl( UNSET_SUBID );
156                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
157                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
158                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
159                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
160         }
161         msg->len = 0;                                                                                   // length of data in the payload
162         msg->cookie = 0x4942;
163         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
164         msg->sub_id = UNSET_SUBID;
165         msg->mtype = UNSET_MSGTYPE;
166         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
167         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
168         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
169         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
170         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
171         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
172         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
173
174         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
175
176         return msg;
177 }
178
179 /*
180         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
181         transport receive should allocate that on its own.
182 */
183 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
184         size_t  mlen;
185         uta_mhdr_t* hdr;                        // convenience pointer
186         rmr_mbuf_t* msg;
187
188         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
189                 if( msg->tp_buf ) {
190                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
191                 }
192         } else {
193                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
194                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
195                         return NULL;                                                    // this used to exit, but that seems wrong
196                 }
197         }
198
199         memset( msg, 0, sizeof( *msg ) );
200
201         msg->cookie = 0x4942;
202         msg->sub_id = UNSET_SUBID;
203         msg->mtype = UNSET_MSGTYPE;
204         msg->tp_buf = NULL;
205         msg->header = NULL;
206         msg->len = -1;                                                                                  // no payload; invalid len
207         msg->alloc_len = 0;
208         msg->payload = NULL;
209         msg->xaction = NULL;
210         msg->state = state;
211         msg->flags = 0;
212         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
213
214         return msg;
215 }
216
217 /*
218         This accepts a message with the assumption that only the tp_buf pointer is valid. It
219         sets all of the various header/payload/xaction pointers in the mbuf to the proper
220         spot in the transport layer buffer.  The len in the header is assumed to be the
221         allocated len (a receive buffer that nng created);
222
223         The alen parm is the assumed allocated length; assumed because it's a value likely
224         to have come from si receive and the actual alloc len might be larger, but we
225         can only assume this is the total usable space. Because we are managing a transport
226         header in the first n bytes of the real msg, we must adjust this length down by the
227         size of the tp header (for testing 50 bytes, but this should change to a struct if
228         we adopt this interface).
229
230         This function returns the message with an error state set if it detects that the
231         received message might have been truncated.  Check is done here as the calculation
232         is somewhat based on header version.
233 */
234 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
235         uta_mhdr_t* hdr = NULL;                 // current header
236         uta_v1mhdr_t* v1hdr;                    // version 1 header
237         int ver;
238         int     hlen;                                           // header len to use for a truncation check
239
240         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
241
242         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
243
244         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
245                 ver = 1;
246                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
247         } else {
248                 ver = ntohl( v1hdr->rmr_ver );
249         }
250
251         switch( ver ) {
252                 case 1:
253                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
254                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
255                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
256
257                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
258                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
259                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
260                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
261                         msg->state = RMR_OK;
262                         hlen = sizeof( uta_v1mhdr_t );
263                         break;
264
265                 default:                                                                                                        // current version always lands here
266                         hdr = (uta_mhdr_t *) msg->header;
267                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
268                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
269
270                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
271                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
272                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
273                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
274                         msg->sub_id = ntohl( hdr->sub_id );
275                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
276                         break;
277         }
278
279         if( msg->len > (msg->alloc_len - hlen ) ) {
280                 msg->state = RMR_ERR_TRUNC;
281                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
282         } else {
283                 msg->state = RMR_OK;
284         }
285 }
286
287 /*
288         This will clone a message into a new zero copy buffer and return the cloned message.
289 */
290 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
291         rmr_mbuf_t* nm;                 // new message buffer
292         size_t  mlen;
293         int state;
294         uta_mhdr_t* hdr;
295         uta_v1mhdr_t* v1hdr;
296
297         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
298         if( nm == NULL ) {
299                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
300                 exit( 1 );
301         }
302         memset( nm, 0, sizeof( *nm ) );
303
304         mlen = old_msg->alloc_len;                                                                              // length allocated before
305         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
306                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
307                 abort();
308         }
309
310         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
311         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
312         switch( ntohl( v1hdr->rmr_ver ) ) {
313                 case 1:
314                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
315                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
316                         break;
317
318                 default:                                                                                        // current message always caught  here
319                         hdr = nm->header;
320                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
321                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
322                         break;
323         }
324
325         // --- these are all version agnostic -----------------------------------
326         nm->mtype = old_msg->mtype;
327         nm->sub_id = old_msg->sub_id;
328         nm->len = old_msg->len;                                                                 // length of data in the payload
329         nm->alloc_len = mlen;                                                                   // length of allocated payload
330
331         nm->xaction = &hdr->xid[0];                                                             // reference xaction
332         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
333         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
334         memcpy( nm->payload, old_msg->payload, old_msg->len );
335
336         return nm;
337 }
338
339 /*
340         This will clone a message with a change to the trace area in the header such that
341         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
342         The orignal message will be left unchanged, and a pointer to the new message is returned.
343         It is not possible to realloc buffers and change the data sizes.
344 */
345 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
346         rmr_mbuf_t* nm;                 // new message buffer
347         size_t  mlen;
348         int state;
349         uta_mhdr_t* hdr;
350         uta_v1mhdr_t* v1hdr;
351         int     tr_old_len;                     // tr size in new buffer
352         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
353         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
354
355         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
356         if( nm == NULL ) {
357                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
358                 exit( 1 );
359         }
360         memset( nm, 0, sizeof( *nm ) );
361
362         hdr = old_msg->header;
363         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
364
365         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
366         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
367
368         tpb_len = mlen + TP_HDR_LEN;
369         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
370                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
371                 exit( 1 );
372         }
373         memset( nm->tp_buf, 0, tpb_len );
374         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
375         alen = (int *) nm->tp_buf;
376         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
377
378         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
379
380         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
381         switch( ntohl( v1hdr->rmr_ver ) ) {
382                 case 1:
383                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
384                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
385                         break;
386
387                 default:                                                                                        // current message version always caught  here
388                         hdr = nm->header;
389                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
390                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
391
392                         if( RMR_D1_LEN( hdr )  ) {
393                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
394                         }
395                         if( RMR_D2_LEN( hdr )  ) {
396                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
397                         }
398
399                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
400                         break;
401         }
402
403         // --- these are all version agnostic -----------------------------------
404         nm->mtype = old_msg->mtype;
405         nm->sub_id = old_msg->sub_id;
406         nm->len = old_msg->len;                                                                 // length of data in the payload
407         nm->alloc_len = mlen;                                                                   // length of allocated payload
408
409         nm->xaction = &hdr->xid[0];                                                             // reference xaction
410         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
411         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
412         memcpy( nm->payload, old_msg->payload, old_msg->len );
413
414         return nm;
415 }
416
417 /*
418         Realloc the message such that the payload is at least payload_len bytes.  
419         The clone and copy options affect what portion of the original payload is copied to
420         the reallocated message, and whether or not the original payload is lost after the
421         reallocation process has finished.
422
423                 copy == true
424                 The entire payload from the original message will be coppied to the reallocated
425                 payload.
426
427                 copy == false
428                 Only the header (preserving return to sender information, message type, etc)
429                 is preserved after reallocation; the payload used lengrh is set to 0 and the
430                 payload is NOT initialised/cleared.
431
432                 clone == true
433                 The orignal message is preserved and a completely new message buffer and payload
434                 are allocated (even if the size given is the same). A pointer to the new message
435                 buffer is returned and it is the user application's responsibility to manage the
436                 old buffer (e.g. free when not needed).
437
438                 clone == false
439                 The old payload will be lost after reallocation. The message buffer pointer which
440                 is returned will likely reference the same structure (don't depend on that).
441                 
442
443         CAUTION:
444         If the message is not a message which was received, the mtype, sub-id, length values in the
445         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
446         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
447         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
448         settings and NOT from the copied RMR header in transport buffer.
449 */
450 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
451         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
452         size_t  mlen;
453         uta_mhdr_t* omhdr;              // old message header
454         int             tr_old_len;             // tr size in new buffer
455         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
456         int             hdr_len = 0;    // length of RMR and transport headers in old msg
457         void*   old_tp_buf;             // pointer to the old tp buffer
458         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
459         int             old_mt;                 // msg type and sub-id from the message passed in
460         int             old_sid;
461         int             old_len;
462         int             old_rfd;                // rts file descriptor from old message
463
464         if( old_msg == NULL || payload_len <= 0 ) {
465                 errno = EINVAL;
466                 return NULL;
467         }
468
469         old_mt = old_msg->mtype;                        // preserve mbuf info
470         old_sid = old_msg->sub_id;
471         old_len = old_msg->len;
472         old_rfd = old_msg->rts_fd;
473
474         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
475
476         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
477                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
478                 return old_msg;
479         }
480
481         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
482         old_tp_buf = old_msg->tp_buf;
483
484         if( clone ) {
485                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
486                 free_tp = 0;
487
488                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
489                 if( nm == NULL ) {
490                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
491                         return NULL;
492                 }
493                 memset( nm, 0, sizeof( *nm ) );
494                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
495         } else {
496                 nm = old_msg;
497         }
498
499         omhdr = old_msg->header;
500         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
501
502         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
503         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
504                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
505                 free( nm );
506                 return NULL;
507         }
508
509         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
510         SET_HDR_LEN( nm->header );
511
512         if( copy ) {                                                                                                                            // if we need to copy the old payload too
513                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
514                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
515         } else {                                                                                                                                        // just need to copy header
516                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
517                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
518         }
519
520         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
521
522         if( !copy ) {
523                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
524                 nm->sub_id = -1;
525                 nm->len = 0;                                            // and len is 0
526         } else {
527                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
528                 nm->mtype = old_mt;
529                 nm->sub_id = old_sid;
530         }
531
532         if( free_tp ) {
533                 free( old_tp_buf );                             // we did not clone, so free b/c no references
534         }
535
536         return nm;
537 }
538
539 /*
540         For SI95 based transport all receives are driven through the threaded
541         ring and thus this function should NOT be called. If it is we will panic
542         and abort straight away.
543 */
544 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
545
546 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
547 exit( 1 );
548
549         return NULL;
550 }
551
552 /*
553         This does the hard work of actually sending the message to the given socket. On success,
554         a new message struct is returned. On error, the original msg is returned with the state
555         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
556         buffer will not be allocated and returned (mostly for call() interal processing since
557         the return message from call() is a received buffer, not a new one).
558
559         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
560         validation has been done prior.
561
562         When msg->state is not ok, this function must set tp_state in the message as some API 
563         fucntions return the message directly and do not propigate errno into the message.
564 */
565 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
566         int state;
567         uta_mhdr_t*     hdr;
568         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
569         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
570         int tot_len;                                                    // total send length (hdr + user data + tp header)
571
572         // future: ensure that application did not overrun the XID buffer; last byte must be 0
573
574         hdr = (uta_mhdr_t *) msg->header;
575         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
576         hdr->sub_id = htonl( msg->sub_id );
577         hdr->plen = htonl( msg->len );
578         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
579
580         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
581                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
582                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
583         }
584
585         if( retries == 0 ) {
586                 spin_retries = 100;
587                 retries++;
588         }
589
590         errno = 0;
591         msg->state = RMR_OK;
592         do {
593                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
594                 if( tot_len > msg->alloc_len ) {
595                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
596                 }
597                 *((int*) msg->tp_buf) = tot_len;
598
599                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
600                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
601
602                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
603                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
604                         msg->state = state;
605                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
606                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
607                                         retries--;
608                                         if( retries > 0 ) {                                     // only if we'll loop through again
609                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
610                                         }
611                                         spin_retries = 1000;
612                                 }
613                         } else {
614                                 state = 0;                      // don't loop
615                         }
616                 } else {
617                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
618                         state = 0;
619                         msg->state = RMR_OK;
620                         hdr = NULL;
621                 }
622         } while( state && retries > 0 );
623
624         if( msg->state == RMR_OK ) {                                                                    // successful send
625                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
626                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
627                 } else {
628                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
629                         return NULL;
630                 }
631         } else {                                                                                        // send failed -- return original message
632                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
633                         errno = EAGAIN;
634                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
635                 } else {
636                         msg->state = RMR_ERR_SENDFAILED;
637                 }
638
639                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
640         }
641
642         return msg;
643 }
644
645 /*
646         send message with maximum timeout.
647         Accept a message and send it to an endpoint based on message type.
648         If NNG reports that the send attempt timed out, or should be retried,
649         RMr will retry for approximately max_to microseconds; rounded to the next
650         higher value of 10.
651
652         Allocates a new message buffer for the next send. If a message type has
653         more than one group of endpoints defined, then the message will be sent
654         in round robin fashion to one endpoint in each group.
655
656         An endpoint will be looked up in the route table using the message type and
657         the subscription id. If the subscription id is "UNSET_SUBID", then only the
658         message type is used.  If the initial lookup, with a subid, fails, then a
659         second lookup using just the mtype is tried.
660
661         When msg->state is not OK, this function must set tp_state in the message as 
662         some API fucntions return the message directly and do not propigate errno into 
663         the message.
664
665         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
666                 it will return with an error and errno set to eagain. If the send is
667                 a limited fanout, then the returned status is the status of the last
668                 send attempt.
669
670 */
671 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
672         endpoint_t*     ep;                                     // end point that we're attempting to send to
673         rtable_ent_t*   rte;                    // the route table entry which matches the message key
674         int     nn_sock;                                        // endpoint socket (fd in si case) for send
675         uta_ctx_t*      ctx;
676         int                     group;                          // selected group to get socket for
677         int                     send_again;                     // true if the message must be sent again
678         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
679         int                     sock_ok;                        // got a valid socket from round robin select
680         char*           d1;
681         int                     ok_sends = 0;           // track number of ok sends
682
683         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
684                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
685                 if( msg != NULL ) {
686                         msg->state = RMR_ERR_BADARG;
687                         errno = EINVAL;                                                                                 // must ensure it's not eagain
688                         msg->tp_state = errno;
689                 }
690                 return msg;
691         }
692
693         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
694         if( msg->header == NULL ) {
695                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
696                 msg->state = RMR_ERR_NOHDR;
697                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
698                 msg->tp_state = errno;
699                 return msg;
700         }
701
702         if( max_to < 0 ) {
703                 max_to = ctx->send_retries;             // convert to retries
704         }
705
706         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
707                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
708                 msg->state = RMR_ERR_NOENDPT;
709                 errno = ENXIO;                                                                          // must ensure it's not eagain
710                 msg->tp_state = errno;
711                 return msg;                                                                                     // caller can resend (maybe) or free
712         }
713
714         send_again = 1;                                                                                 // force loop entry
715         group = 0;                                                                                              // always start with group 0
716         while( send_again ) {
717                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
718                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
719                 } else {
720                         sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
721                         send_again = 0;
722                 }
723
724                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
725                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
726
727                 group++;
728
729                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
730                         if( send_again ) {
731                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
732                                 if( clone_m == NULL ) {
733                                         msg->state = RMR_ERR_SENDFAILED;
734                                         errno = ENOMEM;
735                                         msg->tp_state = errno;
736                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
737                                         return msg;
738                                 }
739
740                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
741                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
742                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
743         
744                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
745                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
746                                         msg = clone_m;
747                                 } else {
748                                         ok_sends++;
749                                         msg = clone_m;                                                                          // clone will be the next to send
750                                 }
751                         } else {
752                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
753                                 if( DEBUG ) {
754                                         if( msg == NULL ) {
755                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
756                                         }
757                                 }
758                         }
759
760                         if( ep != NULL && msg != NULL ) {
761                                 switch( msg->state ) {
762                                         case RMR_OK:
763                                                 ep->scounts[EPSC_GOOD]++;
764                                                 break;
765                                 
766                                         case RMR_ERR_RETRY:
767                                                 ep->scounts[EPSC_TRANS]++;
768                                                 break;
769
770                                         default:
771                                                 ep->scounts[EPSC_FAIL]++;
772                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
773                                                 break;
774                                 }
775                         }
776                 } else {
777                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
778                         msg->state = RMR_ERR_NOENDPT;
779                         errno = ENXIO;
780                 }
781         }
782
783         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
784                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
785                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
786                         msg->state = RMR_OK;
787                 }
788         
789                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
790         
791                 msg->tp_state = errno;
792         }
793
794         return msg;                                                                     // last message caries the status of last/only send attempt
795 }
796
797
798 /*
799         A generic wrapper to the real send to keep wormhole stuff agnostic.
800         We assume the wormhole function vetted the buffer so we don't have to.
801 */
802 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
803         return send_msg( ctx, msg, ep->nn_sock, -1 );
804 }
805
806 #endif