Revert RTS to use unidirectional connection
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_40( char *p, char* label ) {
35         int i;
36
37         if( label )
38                 fprintf( stderr, ">>>>> %s p=%p\n", label, p );
39
40         for( i = 0; i < 40; i++ ) {
41                 fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
42         }
43         fprintf( stderr, "\n" );
44 }
45
46 /*
47         Translates the nng state passed in to one of ours that is suitable to put
48         into the message, and sets errno to something that might be useful.
49         If we don't have a specific RMr state, then we return the default (e.g.
50         receive failed).
51
52         The addition of the connection shut error code to the switch requires
53         that the NNG version at commit e618abf8f3db2a94269a (or after) be
54         used for compiling RMR. 
55 */
56 static inline int xlate_si_state( int state, int def_state ) {
57
58         switch( state ) {
59                 case SI_ERR_NONE:
60                         errno = 0;
61                         state = RMR_OK;
62                         break;
63
64                 default:
65                         state = def_state;
66                         errno = EAGAIN;
67                         break;
68
69                 case SI_ERR_QUEUED:
70                 case SI_ERR_TPORT:
71                 case SI_ERR_UPORT:
72                 case SI_ERR_FORK:
73                 case SI_ERR_HANDLE:
74                 case SI_ERR_SESSID:
75                 case SI_ERR_TP:
76                 case SI_ERR_SHUTD:
77                 case SI_ERR_NOFDS:
78                 case SI_ERR_SIGUSR1:
79                 case SI_ERR_SIGUSR2:
80                 case SI_ERR_DISC:
81                 case SI_ERR_TIMEOUT:
82                 case SI_ERR_ORDREL:
83                 case SI_ERR_SIGALRM:
84                 case SI_ERR_NOMEM:
85                 case SI_ERR_ADDR:
86                         errno  = ENOTSUP;
87                         state = def_state;
88                         break;
89         }
90
91         return state;
92 }
93
94 /*
95         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
96         a new message struct as well. Size is the size of the zc buffer to allocate (not
97         including our header). If size is 0, then the buffer allocated is the size previously
98         allocated (if msg is !nil) or the default size given at initialisation).
99
100         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
101         the context value is used.
102
103         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
104                 are zero copy, however ONLY the message is zero copy. We now allocate and use
105                 nng messages.
106 */
107 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
108         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
109         uta_mhdr_t*     hdr;                    // convenience pointer
110         int                     tr_len;                 // trace data len (default or override)
111         int*            alen;                   // convenience pointer to set allocated len
112
113         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
114
115         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
116         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
117         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
118
119         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
120                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
121                 if( msg == NULL ) {
122                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
123                         return NULL;                                                            // we used to exit -- that seems wrong
124                 }
125                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
126         } else {                                                                // user message or message from the ring
127                 if( mlen > msg->alloc_len ) {           // current allocation is too small
128                         msg->alloc_len = 0;                             // force tp_buffer realloc below
129                         if( msg->tp_buf ) {
130                                 free( msg->tp_buf );
131                         }
132                 } else {
133                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
134                 }
135         }
136
137         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
138
139         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
140                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
141                 abort( );                                                                                       // toss out a core file for this
142         }
143
144 /*
145         memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
146         memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TPHDR_LEN );         // NOT for production -- debugging eyecatcher
147 */
148         alen = (int *) msg->tp_buf;
149         *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
150
151         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
152         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
153         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
154                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
155                 hdr->sub_id = htonl( UNSET_SUBID );
156                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
157                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
158                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
159                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
160         }
161         msg->len = 0;                                                                                   // length of data in the payload
162         msg->cookie = 0x4942;
163         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
164         msg->sub_id = UNSET_SUBID;
165         msg->mtype = UNSET_MSGTYPE;
166         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
167         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
168         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
169         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
170         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
171         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
172         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
173
174         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
175
176         return msg;
177 }
178
179 /*
180         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
181         transport receive should allocate that on its own.
182 */
183 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
184         size_t  mlen;
185         uta_mhdr_t* hdr;                        // convenience pointer
186         rmr_mbuf_t* msg;
187
188         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
189                 if( msg->tp_buf ) {
190                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
191                 }
192         } else {
193                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
194                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
195                         return NULL;                                                    // this used to exit, but that seems wrong
196                 }
197         }
198
199         memset( msg, 0, sizeof( *msg ) );
200
201         msg->cookie = 0x4942;
202         msg->sub_id = UNSET_SUBID;
203         msg->mtype = UNSET_MSGTYPE;
204         msg->tp_buf = NULL;
205         msg->header = NULL;
206         msg->len = -1;                                                                                  // no payload; invalid len
207         msg->alloc_len = 0;
208         msg->payload = NULL;
209         msg->xaction = NULL;
210         msg->state = state;
211         msg->flags = 0;
212         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
213
214         return msg;
215 }
216
217 /*
218         This accepts a message with the assumption that only the tp_buf pointer is valid. It
219         sets all of the various header/payload/xaction pointers in the mbuf to the proper
220         spot in the transport layer buffer.  The len in the header is assumed to be the
221         allocated len (a receive buffer that nng created);
222
223         The alen parm is the assumed allocated length; assumed because it's a value likely
224         to have come from si receive and the actual alloc len might be larger, but we
225         can only assume this is the total usable space. Because we are managing a transport
226         header in the first n bytes of the real msg, we must adjust this length down by the
227         size of the tp header (for testing 50 bytes, but this should change to a struct if
228         we adopt this interface).
229
230         This function returns the message with an error state set if it detects that the
231         received message might have been truncated.  Check is done here as the calculation
232         is somewhat based on header version.
233 */
234 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
235         uta_mhdr_t* hdr = NULL;                 // current header
236         uta_v1mhdr_t* v1hdr;                    // version 1 header
237         int ver;
238         int     hlen;                                           // header len to use for a truncation check
239
240         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
241
242         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
243
244         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
245                 ver = 1;
246                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
247         } else {
248                 ver = ntohl( v1hdr->rmr_ver );
249         }
250
251         switch( ver ) {
252                 case 1:
253                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
254                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
255                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
256
257                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
258                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
259                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
260                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
261                         msg->state = RMR_OK;
262                         hlen = sizeof( uta_v1mhdr_t );
263                         break;
264
265                 default:                                                                                                        // current version always lands here
266                         hdr = (uta_mhdr_t *) msg->header;
267                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
268                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
269
270                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
271                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
272                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
273                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
274                         msg->sub_id = ntohl( hdr->sub_id );
275                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
276                         break;
277         }
278
279         if( msg->len > (msg->alloc_len - hlen ) ) {
280                 msg->state = RMR_ERR_TRUNC;
281                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
282         } else {
283                 msg->state = RMR_OK;
284         }
285 }
286
287 /*
288         This will clone a message into a new zero copy buffer and return the cloned message.
289 */
290 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
291         rmr_mbuf_t* nm;                 // new message buffer
292         size_t  mlen;
293         int state;
294         uta_mhdr_t* hdr;
295         uta_v1mhdr_t* v1hdr;
296
297         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
298         if( nm == NULL ) {
299                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
300                 exit( 1 );
301         }
302         memset( nm, 0, sizeof( *nm ) );
303
304         mlen = old_msg->alloc_len;                                                                              // length allocated before
305         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
306                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
307                 abort();
308         }
309
310         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
311         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
312         switch( ntohl( v1hdr->rmr_ver ) ) {
313                 case 1:
314                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
315                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
316                         break;
317
318                 default:                                                                                        // current message always caught  here
319                         hdr = nm->header;
320                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
321                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
322                         break;
323         }
324
325         // --- these are all version agnostic -----------------------------------
326         nm->mtype = old_msg->mtype;
327         nm->sub_id = old_msg->sub_id;
328         nm->len = old_msg->len;                                                                 // length of data in the payload
329         nm->alloc_len = mlen;                                                                   // length of allocated payload
330
331         nm->xaction = hdr->xid;                                                                 // reference xaction
332         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
333         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
334         memcpy( nm->payload, old_msg->payload, old_msg->len );
335
336         return nm;
337 }
338
339 /*
340         This will clone a message with a change to the trace area in the header such that
341         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
342         The orignal message will be left unchanged, and a pointer to the new message is returned.
343         It is not possible to realloc buffers and change the data sizes.
344 */
345 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
346         rmr_mbuf_t* nm;                 // new message buffer
347         size_t  mlen;
348         int state;
349         uta_mhdr_t* hdr;
350         uta_v1mhdr_t* v1hdr;
351         int     tr_old_len;                     // tr size in new buffer
352         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
353         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
354
355         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
356         if( nm == NULL ) {
357                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
358                 exit( 1 );
359         }
360         memset( nm, 0, sizeof( *nm ) );
361
362         hdr = old_msg->header;
363         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
364
365         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
366         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
367
368         tpb_len = mlen + TP_HDR_LEN;
369         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
370                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
371                 exit( 1 );
372         }
373         memset( nm->tp_buf, 0, tpb_len );
374         memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
375         alen = (int *) nm->tp_buf;
376         *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
377
378         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
379
380         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
381         switch( ntohl( v1hdr->rmr_ver ) ) {
382                 case 1:
383                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
384                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
385                         break;
386
387                 default:                                                                                        // current message version always caught  here
388                         hdr = nm->header;
389                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
390                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
391
392                         if( RMR_D1_LEN( hdr )  ) {
393                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
394                         }
395                         if( RMR_D2_LEN( hdr )  ) {
396                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
397                         }
398
399                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
400                         break;
401         }
402
403         // --- these are all version agnostic -----------------------------------
404         nm->mtype = old_msg->mtype;
405         nm->sub_id = old_msg->sub_id;
406         nm->len = old_msg->len;                                                                 // length of data in the payload
407         nm->alloc_len = mlen;                                                                   // length of allocated payload
408
409         nm->xaction = hdr->xid;                                                                 // reference xaction
410         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
411         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
412         memcpy( nm->payload, old_msg->payload, old_msg->len );
413
414         return nm;
415 }
416
417 /*
418         Realloc the message such that the payload is at least payload_len bytes.  
419         The clone and copy options affect what portion of the original payload is copied to
420         the reallocated message, and whether or not the original payload is lost after the
421         reallocation process has finished.
422
423                 copy == true
424                 The entire payload from the original message will be coppied to the reallocated
425                 payload.
426
427                 copy == false
428                 Only the header (preserving return to sender information, message type, etc)
429                 is preserved after reallocation; the payload used lengrh is set to 0 and the
430                 payload is NOT initialised/cleared.
431
432                 clone == true
433                 The orignal message is preserved and a completely new message buffer and payload
434                 are allocated (even if the size given is the same). A pointer to the new message
435                 buffer is returned and it is the user application's responsibility to manage the
436                 old buffer (e.g. free when not needed).
437
438                 clone == false
439                 The old payload will be lost after reallocation. The message buffer pointer which
440                 is returned will likely reference the same structure (don't depend on that).
441                 
442
443         CAUTION:
444         If the message is not a message which was received, the mtype, sub-id, length values in the
445         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
446         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
447         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
448         settings and NOT from the copied RMR header in transport buffer.
449 */
450 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
451         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
452         size_t  mlen;
453         uta_mhdr_t* omhdr;              // old message header
454         int             tr_old_len;             // tr size in new buffer
455         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
456         int             hdr_len = 0;    // length of RMR and transport headers in old msg
457         void*   old_tp_buf;             // pointer to the old tp buffer
458         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
459         int             old_mt;                 // msg type and sub-id from the message passed in
460         int             old_sid;
461         int             old_len;
462         int             old_rfd;                // rts file descriptor from old message
463
464         if( old_msg == NULL || payload_len <= 0 ) {
465                 errno = EINVAL;
466                 return NULL;
467         }
468
469         old_mt = old_msg->mtype;                        // preserve mbuf info
470         old_sid = old_msg->sub_id;
471         old_len = old_msg->len;
472         old_rfd = old_msg->rts_fd;
473
474         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
475
476         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
477                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
478                 return old_msg;
479         }
480
481         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
482         old_tp_buf = old_msg->tp_buf;
483
484         if( clone ) {
485                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
486                 free_tp = 0;
487
488                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
489                 if( nm == NULL ) {
490                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
491                         return NULL;
492                 }
493                 memset( nm, 0, sizeof( *nm ) );
494                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
495         } else {
496                 nm = old_msg;
497         }
498
499         omhdr = old_msg->header;
500         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
501
502         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
503         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
504                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
505                 return NULL;
506         }
507
508         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
509         SET_HDR_LEN( nm->header );
510
511         if( copy ) {                                                                                                                            // if we need to copy the old payload too
512                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
513                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
514         } else {                                                                                                                                        // just need to copy header
515                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
516                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
517         }
518
519         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
520
521         if( !copy ) {
522                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
523                 nm->sub_id = -1;
524                 nm->len = 0;                                            // and len is 0
525         } else {
526                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
527                 nm->mtype = old_mt;
528                 nm->sub_id = old_sid;
529         }
530
531         if( free_tp ) {
532                 free( old_tp_buf );                             // we did not clone, so free b/c no references
533         }
534
535         return nm;
536 }
537
538 /*
539         For SI95 based transport all receives are driven through the threaded
540         ring and thus this function should NOT be called. If it is we will panic
541         and abort straight away.
542 */
543 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
544
545 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
546 exit( 1 );
547
548         return NULL;
549 }
550
551 /*
552         This does the hard work of actually sending the message to the given socket. On success,
553         a new message struct is returned. On error, the original msg is returned with the state
554         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
555         buffer will not be allocated and returned (mostly for call() interal processing since
556         the return message from call() is a received buffer, not a new one).
557
558         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
559         validation has been done prior.
560
561         When msg->state is not ok, this function must set tp_state in the message as some API 
562         fucntions return the message directly and do not propigate errno into the message.
563 */
564 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
565         int state;
566         uta_mhdr_t*     hdr;
567         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
568         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
569         int tot_len;                                                    // total send length (hdr + user data + tp header)
570
571         // future: ensure that application did not overrun the XID buffer; last byte must be 0
572
573         hdr = (uta_mhdr_t *) msg->header;
574         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
575         hdr->sub_id = htonl( msg->sub_id );
576         hdr->plen = htonl( msg->len );
577         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
578
579         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
580                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
581                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
582         }
583
584         if( retries == 0 ) {
585                 spin_retries = 100;
586                 retries++;
587         }
588
589         errno = 0;
590         msg->state = RMR_OK;
591         do {
592                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
593                 if( tot_len > msg->alloc_len ) {
594                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
595                 }
596                 *((int*) msg->tp_buf) = tot_len;
597
598                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
599                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
600
601                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
602                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
603                         msg->state = state;
604                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
605                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
606                                         retries--;
607                                         if( retries > 0 ) {                                     // only if we'll loop through again
608                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
609                                         }
610                                         spin_retries = 1000;
611                                 }
612                         } else {
613                                 state = 0;                      // don't loop
614                         }
615                 } else {
616                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
617                         state = 0;
618                         msg->state = RMR_OK;
619                         hdr = NULL;
620                 }
621         } while( state && retries > 0 );
622
623         if( msg->state == RMR_OK ) {                                                                    // successful send
624                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
625                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
626                 } else {
627                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
628                         return NULL;
629                 }
630         } else {                                                                                        // send failed -- return original message
631                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
632                         errno = EAGAIN;
633                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
634                 } else {
635                         msg->state = RMR_ERR_SENDFAILED;
636                 }
637
638                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
639         }
640
641         return msg;
642 }
643
644 /*
645         send message with maximum timeout.
646         Accept a message and send it to an endpoint based on message type.
647         If NNG reports that the send attempt timed out, or should be retried,
648         RMr will retry for approximately max_to microseconds; rounded to the next
649         higher value of 10.
650
651         Allocates a new message buffer for the next send. If a message type has
652         more than one group of endpoints defined, then the message will be sent
653         in round robin fashion to one endpoint in each group.
654
655         An endpoint will be looked up in the route table using the message type and
656         the subscription id. If the subscription id is "UNSET_SUBID", then only the
657         message type is used.  If the initial lookup, with a subid, fails, then a
658         second lookup using just the mtype is tried.
659
660         When msg->state is not OK, this function must set tp_state in the message as 
661         some API fucntions return the message directly and do not propigate errno into 
662         the message.
663
664         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
665                 it will return with an error and errno set to eagain. If the send is
666                 a limited fanout, then the returned status is the status of the last
667                 send attempt.
668
669 */
670 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
671         endpoint_t*     ep;                                     // end point that we're attempting to send to
672         rtable_ent_t*   rte;                    // the route table entry which matches the message key
673         int     nn_sock;                                        // endpoint socket (fd in si case) for send
674         uta_ctx_t*      ctx;
675         int                     group;                          // selected group to get socket for
676         int                     send_again;                     // true if the message must be sent again
677         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
678         int                     sock_ok;                        // got a valid socket from round robin select
679         char*           d1;
680         int                     ok_sends = 0;           // track number of ok sends
681
682         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
683                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
684                 if( msg != NULL ) {
685                         msg->state = RMR_ERR_BADARG;
686                         errno = EINVAL;                                                                                 // must ensure it's not eagain
687                         msg->tp_state = errno;
688                 }
689                 return msg;
690         }
691
692         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
693         if( msg->header == NULL ) {
694                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
695                 msg->state = RMR_ERR_NOHDR;
696                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
697                 msg->tp_state = errno;
698                 return msg;
699         }
700
701         if( max_to < 0 ) {
702                 max_to = ctx->send_retries;             // convert to retries
703         }
704
705         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
706                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
707                 msg->state = RMR_ERR_NOENDPT;
708                 errno = ENXIO;                                                                          // must ensure it's not eagain
709                 msg->tp_state = errno;
710                 return msg;                                                                                     // caller can resend (maybe) or free
711         }
712
713         send_again = 1;                                                                                 // force loop entry
714         group = 0;                                                                                              // always start with group 0
715         while( send_again ) {
716                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
717                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
718                 } else {
719                         sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
720                         send_again = 0;
721                 }
722
723                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
724                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
725
726                 group++;
727
728                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
729                         if( send_again ) {
730                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
731                                 if( clone_m == NULL ) {
732                                         msg->state = RMR_ERR_SENDFAILED;
733                                         errno = ENOMEM;
734                                         msg->tp_state = errno;
735                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
736                                         return msg;
737                                 }
738
739                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
740                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
741                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
742         
743                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
744                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
745                                         msg = clone_m;
746                                 } else {
747                                         ok_sends++;
748                                         msg = clone_m;                                                                          // clone will be the next to send
749                                 }
750                         } else {
751                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
752                                 if( DEBUG ) {
753                                         if( msg == NULL ) {
754                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
755                                         }
756                                 }
757                         }
758
759                         if( ep != NULL && msg != NULL ) {
760                                 switch( msg->state ) {
761                                         case RMR_OK:
762                                                 ep->scounts[EPSC_GOOD]++;
763                                                 break;
764                                 
765                                         case RMR_ERR_RETRY:
766                                                 ep->scounts[EPSC_TRANS]++;
767                                                 break;
768
769                                         default:
770                                                 ep->scounts[EPSC_FAIL]++;
771                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
772                                                 break;
773                                 }
774                         }
775                 } else {
776                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
777                         msg->state = RMR_ERR_NOENDPT;
778                         errno = ENXIO;
779                 }
780         }
781
782         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
783                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
784                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
785                         msg->state = RMR_OK;
786                 }
787         
788                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
789         
790                 msg->tp_state = errno;
791         }
792
793         return msg;                                                                     // last message caries the status of last/only send attempt
794 }
795
796
797 /*
798         A generic wrapper to the real send to keep wormhole stuff agnostic.
799         We assume the wormhole function vetted the buffer so we don't have to.
800 */
801 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
802         return send_msg( ctx, msg, ep->nn_sock, -1 );
803 }
804
805 #endif