Correct excessive TCP connection bug
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_n( char *p, char* label, int n ) {
35         int i;
36         int j;
37         int t = 0;
38         int     rows;
39
40
41         if( label ) {
42                 fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
43         }
44
45         rows = (n/16) + ((n % 16) ? 1 : 0);
46
47         for( j = 0; j < rows; j++ ) {
48                 fprintf( stderr, "[DUMP] %04x: ", j * 16 );
49
50                 for( i = 0; t < n && i < 16; i++, t++ ) {
51                         fprintf( stderr, "%02x ", (unsigned char) *p );
52                         p++;
53                 }
54                 fprintf( stderr, "\n" );
55         }
56 }
57
58 /*
59         backwards compatability.
60 */
61 static void dump_40( char *p, char* label ) {
62         dump_n( p, label, 40 );
63 }
64
65 /*
66         Translates the nng state passed in to one of ours that is suitable to put
67         into the message, and sets errno to something that might be useful.
68         If we don't have a specific RMr state, then we return the default (e.g.
69         receive failed).
70
71         The addition of the connection shut error code to the switch requires
72         that the NNG version at commit e618abf8f3db2a94269a (or after) be
73         used for compiling RMR.
74 */
75 static inline int xlate_si_state( int state, int def_state ) {
76
77         switch( state ) {
78                 case SI_ERR_NONE:
79                         errno = 0;
80                         state = RMR_OK;
81                         break;
82
83                 default:
84                         state = def_state;
85                         errno = EAGAIN;
86                         break;
87
88                 case SI_ERR_QUEUED:
89                 case SI_ERR_TPORT:
90                 case SI_ERR_UPORT:
91                 case SI_ERR_FORK:
92                 case SI_ERR_HANDLE:
93                 case SI_ERR_SESSID:
94                 case SI_ERR_TP:
95                 case SI_ERR_SHUTD:
96                 case SI_ERR_NOFDS:
97                 case SI_ERR_SIGUSR1:
98                 case SI_ERR_SIGUSR2:
99                 case SI_ERR_DISC:
100                 case SI_ERR_TIMEOUT:
101                 case SI_ERR_ORDREL:
102                 case SI_ERR_SIGALRM:
103                 case SI_ERR_NOMEM:
104                 case SI_ERR_ADDR:
105                         errno  = ENOTSUP;
106                         state = def_state;
107                         break;
108         }
109
110         return state;
111 }
112
113 /*
114         Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
115         this will put in the size such that it is compatable with old versions
116         of RMR (that expect the message size to not be in network byte order)
117         and with new versions that do. See extract function in mt_call_si_static.c
118         for details on what ends up in the buffer.
119 */
120 static inline void insert_mlen( uint32_t len, char* buf ) {
121         uint32_t* blen;                                                 // pointer into buffer where we'll add the len
122
123         blen = (uint32_t *) buf;                                // old systems expect an unconverted integer
124         *blen = len;
125
126         blen++;
127         *blen = htonl( len );                                   // new systems want a converted integer
128
129         memset( &buf[TP_SZFIELD_LEN], 0, 4 );   // clear to prevent future conversion issues
130         buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;   // marker to flag this is generated by a new message
131 }
132
133 /*
134         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
135         a new message struct as well. Size is the size of the zc buffer to allocate (not
136         including our header). If size is 0, then the buffer allocated is the size previously
137         allocated (if msg is !nil) or the default size given at initialisation).
138
139         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
140         the context value is used.
141
142         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
143                 are zero copy, however ONLY the message is zero copy. We now allocate and use
144                 nng messages.
145 */
146 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
147         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
148         uta_mhdr_t*     hdr;                    // convenience pointer
149         int                     tr_len;                 // trace data len (default or override)
150         int*            alen;                   // convenience pointer to set allocated len
151
152         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
153
154         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
155         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
156         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
157
158         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
159                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
160                 if( msg == NULL ) {
161                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
162                         return NULL;                                                            // we used to exit -- that seems wrong
163                 }
164                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
165         } else {                                                                // user message or message from the ring
166                 if( mlen > msg->alloc_len ) {           // current allocation is too small
167                         msg->alloc_len = 0;                             // force tp_buffer realloc below
168                         if( msg->tp_buf ) {
169                                 free( msg->tp_buf );
170                                 msg->tp_buf = NULL;
171                         }
172                 } else {
173                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
174                 }
175         }
176
177         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
178
179         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
180                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
181                 abort( );                                                                                       // toss out a core file for this
182         }
183
184         if( DEBUG ) {
185                 // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
186                 memset( msg->tp_buf, 0, mlen );
187                 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );                // do NOT use a $ in this string!
188         }
189
190         insert_mlen( (uint32_t) mlen, msg->tp_buf );                    // this will likely be overwriten on send to shirnk
191
192         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
193         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
194         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
195                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
196                 hdr->sub_id = htonl( UNSET_SUBID );
197                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
198                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
199                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
200                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
201         }
202         msg->len = 0;                                                                                   // length of data in the payload
203         msg->cookie = 0x4942;
204         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
205         msg->sub_id = UNSET_SUBID;
206         msg->mtype = UNSET_MSGTYPE;
207         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
208         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
209         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
210         msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
211         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
212         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
213         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
214
215         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
216
217         return msg;
218 }
219
220 /*
221         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
222         transport receive should allocate that on its own.
223 */
224 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
225         size_t  mlen;
226         uta_mhdr_t* hdr;                        // convenience pointer
227         rmr_mbuf_t* msg;
228
229         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
230                 if( msg->tp_buf ) {
231                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
232                 }
233         } else {
234                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
235                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
236                         return NULL;                                                    // this used to exit, but that seems wrong
237                 }
238         }
239
240         memset( msg, 0, sizeof( *msg ) );
241
242         msg->cookie = 0x4942;
243         msg->sub_id = UNSET_SUBID;
244         msg->mtype = UNSET_MSGTYPE;
245         msg->tp_buf = NULL;
246         msg->header = NULL;
247         msg->len = -1;                                                                                  // no payload; invalid len
248         msg->alloc_len = 0;
249         msg->payload = NULL;
250         msg->xaction = NULL;
251         msg->state = state;
252         msg->flags = 0;
253         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
254
255         return msg;
256 }
257
258 /*
259         This accepts a message with the assumption that only the tp_buf pointer is valid. It
260         sets all of the various header/payload/xaction pointers in the mbuf to the proper
261         spot in the transport layer buffer.  The len in the header is assumed to be the
262         allocated len (a receive buffer that nng created);
263
264         The alen parm is the assumed allocated length; assumed because it's a value likely
265         to have come from si receive and the actual alloc len might be larger, but we
266         can only assume this is the total usable space. Because we are managing a transport
267         header in the first n bytes of the real msg, we must adjust this length down by the
268         size of the tp header (for testing 50 bytes, but this should change to a struct if
269         we adopt this interface).
270
271         This function returns the message with an error state set if it detects that the
272         received message might have been truncated.  Check is done here as the calculation
273         is somewhat based on header version.
274 */
275 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
276         uta_mhdr_t* hdr = NULL;                 // current header
277         uta_v1mhdr_t* v1hdr;                    // version 1 header
278         int ver;
279         int     hlen;                                           // header len to use for a truncation check
280
281         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
282
283         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
284
285         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
286                 ver = 1;
287                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
288         } else {
289                 ver = ntohl( v1hdr->rmr_ver );
290         }
291
292         switch( ver ) {
293                 case 1:
294                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
295                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
296                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
297
298                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
299                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
300                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
301                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
302                         msg->state = RMR_OK;
303                         hlen = sizeof( uta_v1mhdr_t );
304                         break;
305
306                 default:                                                                                                        // current version always lands here
307                         hdr = (uta_mhdr_t *) msg->header;
308                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
309                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
310
311                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
312                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
313                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
314                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
315                         msg->sub_id = ntohl( hdr->sub_id );
316                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
317                         break;
318         }
319
320         if( msg->len > (msg->alloc_len - hlen ) ) {
321                 msg->state = RMR_ERR_TRUNC;
322                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
323         } else {
324                 msg->state = RMR_OK;
325         }
326 }
327
328 /*
329         This will clone a message into a new zero copy buffer and return the cloned message.
330 */
331 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
332         rmr_mbuf_t* nm;                 // new message buffer
333         size_t  mlen;
334         int state;
335         uta_mhdr_t* hdr;
336         uta_v1mhdr_t* v1hdr;
337
338         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
339         if( nm == NULL ) {
340                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
341                 exit( 1 );
342         }
343         memset( nm, 0, sizeof( *nm ) );
344
345         mlen = old_msg->alloc_len;                                                                              // length allocated before
346         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
347                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
348                 abort();
349         }
350
351         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
352         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
353         switch( ntohl( v1hdr->rmr_ver ) ) {
354                 case 1:
355                         hdr = nm->header;
356                         memcpy( hdr, old_msg->header, sizeof( *v1hdr ) );               // copy complete header
357                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
358                         break;
359
360                 default:                                                                                        // current message always caught  here
361                         hdr = nm->header;
362                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
363                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
364                         break;
365         }
366
367         // --- these are all version agnostic -----------------------------------
368         nm->mtype = old_msg->mtype;
369         nm->sub_id = old_msg->sub_id;
370         nm->len = old_msg->len;                                                                 // length of data in the payload
371         nm->alloc_len = mlen;                                                                   // length of allocated payload
372
373         nm->xaction = &hdr->xid[0];                                                             // reference xaction
374         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
375         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
376         memcpy( nm->payload, old_msg->payload, old_msg->len );
377
378         return nm;
379 }
380
381 /*
382         This will clone a message with a change to the trace area in the header such that
383         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
384         The orignal message will be left unchanged, and a pointer to the new message is returned.
385         It is not possible to realloc buffers and change the data sizes.
386 */
387 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
388         rmr_mbuf_t* nm;                 // new message buffer
389         size_t  mlen;
390         int state;
391         uta_mhdr_t* hdr;
392         uta_v1mhdr_t* v1hdr;
393         int     tr_old_len;                     // tr size in new buffer
394         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
395         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
396
397         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
398         if( nm == NULL ) {
399                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
400                 exit( 1 );
401         }
402         memset( nm, 0, sizeof( *nm ) );
403
404         hdr = old_msg->header;
405         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
406
407         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
408         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
409
410         tpb_len = mlen + TP_HDR_LEN;
411         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
412                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
413                 exit( 1 );
414         }
415         if( DEBUG ) {
416                 memset( nm->tp_buf, 0, tpb_len );
417                 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );         // DEBUGGING do NOT use $ in this string!!
418         }
419
420         insert_mlen( (uint32_t) tpb_len, nm->tp_buf );                  // this len will likely be reset on send to shrink
421
422         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
423
424         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
425         switch( ntohl( v1hdr->rmr_ver ) ) {
426                 case 1:
427                         v1hdr = nm->header;
428                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
429                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
430                         break;
431
432                 default:                                                                                        // current message version always caught  here
433                         hdr = nm->header;
434                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
435                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
436
437                         if( RMR_D1_LEN( hdr )  ) {
438                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
439                         }
440                         if( RMR_D2_LEN( hdr )  ) {
441                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
442                         }
443
444                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
445                         break;
446         }
447
448         // --- these are all version agnostic -----------------------------------
449         nm->mtype = old_msg->mtype;
450         nm->sub_id = old_msg->sub_id;
451         nm->len = old_msg->len;                                                                 // length of data in the payload
452         nm->alloc_len = mlen;                                                                   // length of allocated payload
453
454         nm->xaction = &hdr->xid[0];                                                             // reference xaction
455         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
456         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
457         memcpy( nm->payload, old_msg->payload, old_msg->len );
458
459         return nm;
460 }
461
462 /*
463         Realloc the message such that the payload is at least payload_len bytes.
464         The clone and copy options affect what portion of the original payload is copied to
465         the reallocated message, and whether or not the original payload is lost after the
466         reallocation process has finished.
467
468                 copy == true
469                 The entire payload from the original message will be coppied to the reallocated
470                 payload.
471
472                 copy == false
473                 Only the header (preserving return to sender information, message type, etc)
474                 is preserved after reallocation; the payload used lengrh is set to 0 and the
475                 payload is NOT initialised/cleared.
476
477                 clone == true
478                 The orignal message is preserved and a completely new message buffer and payload
479                 are allocated (even if the size given is the same). A pointer to the new message
480                 buffer is returned and it is the user application's responsibility to manage the
481                 old buffer (e.g. free when not needed).
482
483                 clone == false
484                 The old payload will be lost after reallocation. The message buffer pointer which
485                 is returned will likely reference the same structure (don't depend on that).
486
487
488         CAUTION:
489         If the message is not a message which was received, the mtype, sub-id, length values in the
490         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
491         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
492         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
493         settings and NOT from the copied RMR header in transport buffer.
494 */
495 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
496         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
497         size_t  mlen;
498         uta_mhdr_t* omhdr;              // old message header
499         int             tr_old_len;             // tr size in new buffer
500         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
501         int             hdr_len = 0;    // length of RMR and transport headers in old msg
502         void*   old_tp_buf;             // pointer to the old tp buffer
503         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
504         int             old_mt;                 // msg type and sub-id from the message passed in
505         int             old_sid;
506         int             old_len;
507         int             old_rfd;                // rts file descriptor from old message
508
509         if( old_msg == NULL || payload_len <= 0 ) {
510                 errno = EINVAL;
511                 return NULL;
512         }
513
514         old_mt = old_msg->mtype;                        // preserve mbuf info
515         old_sid = old_msg->sub_id;
516         old_len = old_msg->len;
517         old_rfd = old_msg->rts_fd;
518
519         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
520
521         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
522                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
523                 return old_msg;
524         }
525
526         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
527         old_tp_buf = old_msg->tp_buf;
528
529         if( clone ) {
530                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
531                 free_tp = 0;
532
533                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
534                 if( nm == NULL ) {
535                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
536                         return NULL;
537                 }
538                 memset( nm, 0, sizeof( *nm ) );
539                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
540         } else {
541                 nm = old_msg;
542         }
543
544         omhdr = old_msg->header;
545         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
546
547         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
548         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
549                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
550                 free( nm );
551                 return NULL;
552         }
553
554         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
555         SET_HDR_LEN( nm->header );
556
557         if( copy ) {                                                                                                                            // if we need to copy the old payload too
558                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
559                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
560         } else {                                                                                                                                        // just need to copy header
561                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
562                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
563         }
564
565         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
566
567         if( !copy ) {
568                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
569                 nm->sub_id = -1;
570                 nm->len = 0;                                            // and len is 0
571         } else {
572                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
573                 nm->mtype = old_mt;
574                 nm->sub_id = old_sid;
575         }
576
577         if( free_tp ) {
578                 free( old_tp_buf );                             // we did not clone, so free b/c no references
579         }
580
581         return nm;
582 }
583
584 /*
585         For SI95 based transport all receives are driven through the threaded
586         ring and thus this function should NOT be called. If it is we will panic
587         and abort straight away.
588 */
589 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
590
591 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
592 exit( 1 );
593
594         return NULL;
595 }
596
597 /*
598         This does the hard work of actually sending the message to the given socket. On success,
599         a new message struct is returned. On error, the original msg is returned with the state
600         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
601         buffer will not be allocated and returned (mostly for call() interal processing since
602         the return message from call() is a received buffer, not a new one).
603
604         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
605         validation has been done prior.
606
607         When msg->state is not ok, this function must set tp_state in the message as some API
608         fucntions return the message directly and do not propigate errno into the message.
609 */
610 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
611         int state;
612         uta_mhdr_t*     hdr;
613         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
614         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
615         int tot_len;                                                    // total send length (hdr + user data + tp header)
616
617         // future: ensure that application did not overrun the XID buffer; last byte must be 0
618
619         hdr = (uta_mhdr_t *) msg->header;
620         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
621         hdr->sub_id = htonl( msg->sub_id );
622         hdr->plen = htonl( msg->len );
623         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
624
625         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
626                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                    // must overlay the source to be ours
627                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
628         }
629
630         if( retries == 0 ) {
631                 spin_retries = 100;
632                 retries++;
633         }
634
635         errno = 0;
636         msg->state = RMR_OK;
637         do {
638                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
639                 if( tot_len > msg->alloc_len ) {
640                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
641                 }
642                 insert_mlen( tot_len, msg->tp_buf );    // shrink to fit
643
644                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
645                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
646
647                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
648                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
649                         msg->state = state;
650                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
651                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
652                                         retries--;
653                                         if( retries > 0 ) {                                     // only if we'll loop through again
654                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
655                                         }
656                                         spin_retries = 1000;
657                                 }
658                         } else {
659                                 state = 0;                      // don't loop
660                         }
661                 } else {
662                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
663                         state = 0;
664                         msg->state = RMR_OK;
665                         hdr = NULL;
666                 }
667         } while( state && retries > 0 );
668
669         if( msg->state == RMR_OK ) {                                                                    // successful send
670                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
671                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
672                 } else {
673                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
674                         return NULL;
675                 }
676         } else {                                                                                        // send failed or would block -- return original message
677                 if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
678                         errno = EAGAIN;
679                         msg->state = RMR_ERR_RETRY;
680                 } else {
681                         rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
682                         msg->state = RMR_ERR_SENDFAILED;
683                 }
684
685                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
686         }
687
688         return msg;
689 }
690
691 /*
692         send message with maximum timeout.
693         Accept a message and send it to an endpoint based on message type.
694         If NNG reports that the send attempt timed out, or should be retried,
695         RMr will retry for approximately max_to microseconds; rounded to the next
696         higher value of 10.
697
698         Allocates a new message buffer for the next send. If a message type has
699         more than one group of endpoints defined, then the message will be sent
700         in round robin fashion to one endpoint in each group.
701
702         An endpoint will be looked up in the route table using the message type and
703         the subscription id. If the subscription id is "UNSET_SUBID", then only the
704         message type is used.  If the initial lookup, with a subid, fails, then a
705         second lookup using just the mtype is tried.
706
707         When msg->state is not OK, this function must set tp_state in the message as
708         some API fucntions return the message directly and do not propigate errno into
709         the message.
710
711         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
712                 it will return with an error and errno set to eagain. If the send is
713                 a limited fanout, then the returned status is the status of the last
714                 send attempt.
715
716 */
717 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
718         endpoint_t*     ep;                                     // end point that we're attempting to send to
719         rtable_ent_t*   rte;                    // the route table entry which matches the message key
720         int     nn_sock;                                        // endpoint socket (fd in si case) for send
721         uta_ctx_t*      ctx;
722         int                     group;                          // selected group to get socket for
723         int                     send_again;                     // true if the message must be sent again
724         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
725         int                     sock_ok;                        // got a valid socket from round robin select
726         char*           d1;
727         int                     ok_sends = 0;           // track number of ok sends
728         route_table_t*  rt;                             // active route table
729
730         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
731                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
732                 if( msg != NULL ) {
733                         msg->state = RMR_ERR_BADARG;
734                         errno = EINVAL;                                                                                 // must ensure it's not eagain
735                         msg->tp_state = errno;
736                 }
737                 return msg;
738         }
739
740         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
741         if( msg->header == NULL ) {
742                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
743                 msg->state = RMR_ERR_NOHDR;
744                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
745                 msg->tp_state = errno;
746                 return msg;
747         }
748
749         if( max_to < 0 ) {
750                 max_to = ctx->send_retries;             // convert to retries
751         }
752
753         rt = get_rt( ctx );                                                                             // get active route table and up ref count
754         if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) {                // find the entry which matches subid/type allow fallback to type only key
755                 release_rt( ctx, rt );
756                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
757                 msg->state = RMR_ERR_NOENDPT;
758                 errno = ENXIO;                                                                          // must ensure it's not eagain
759                 msg->tp_state = errno;
760                 return msg;                                                                                     // caller can resend (maybe) or free
761         }
762
763         send_again = 1;                                                                                 // force loop entry
764         group = 0;                                                                                              // always start with group 0
765         while( send_again ) {
766                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
767                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
768                 } else {
769                         sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
770                         send_again = 0;
771                 }
772
773                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
774                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
775
776                 group++;
777
778                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
779                         if( send_again ) {
780                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
781                                 if( clone_m == NULL ) {
782                                         release_rt( ctx, rt );
783                                         msg->state = RMR_ERR_SENDFAILED;
784                                         errno = ENOMEM;
785                                         msg->tp_state = errno;
786                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
787                                         return msg;
788                                 }
789
790                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
791                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
792                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
793
794                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
795                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
796                                         msg = clone_m;
797                                 } else {
798                                         ok_sends++;
799                                         msg = clone_m;                                                                          // clone will be the next to send
800                                         msg->state = RMR_OK;
801                                 }
802                         } else {
803                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
804                                 if( DEBUG ) {
805                                         if( msg == NULL ) {
806                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
807                                         }
808                                 }
809                         }
810
811                         if( ep != NULL && msg != NULL ) {
812                                 switch( msg->state ) {
813                                         case RMR_OK:
814                                                 ep->scounts[EPSC_GOOD]++;
815                                                 break;
816
817                                         case RMR_ERR_RETRY:
818                                                 ep->scounts[EPSC_TRANS]++;
819                                                 break;
820
821                                         default:
822                                                 ep->scounts[EPSC_FAIL]++;
823                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
824                                                 break;
825                                 }
826                         }
827                 } else {
828                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
829                         msg->state = RMR_ERR_NOENDPT;
830                         errno = ENXIO;
831                 }
832         }
833
834         release_rt( ctx, rt );                          // we can safely dec the ref counter now
835
836         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
837                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
838                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
839                         msg->state = RMR_OK;
840                 }
841
842                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
843
844                 msg->tp_state = errno;
845         }
846
847         return msg;                                                                     // last message caries the status of last/only send attempt
848 }
849
850
851 /*
852         A generic wrapper to the real send to keep wormhole stuff agnostic.
853         We assume the wormhole function vetted the buffer so we don't have to.
854 */
855 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
856         return send_msg( ctx, msg, ep->nn_sock, -1 );
857 }
858
859 #endif