Fix SI95 transport header length bug
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_n( char *p, char* label, int n ) {
35         int i;
36         int j;
37         int t = 0;
38         int     rows;
39
40
41         if( label ) {
42                 fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
43         }
44
45         rows = (n/16) + ((n % 16) ? 1 : 0);
46
47         for( j = 0; j < rows; j++ ) {
48                 fprintf( stderr, "[DUMP] %04x: ", j * 16 );
49
50                 for( i = 0; t < n && i < 16; i++, t++ ) {
51                         fprintf( stderr, "%02x ", (unsigned char) *p );
52                         p++;
53                 }
54                 fprintf( stderr, "\n" );
55         }
56 }
57
58 /*
59         backwards compatability.
60 */
61 static void dump_40( char *p, char* label ) {
62         dump_n( p, label, 40 );
63 }
64
65 /*
66         Translates the nng state passed in to one of ours that is suitable to put
67         into the message, and sets errno to something that might be useful.
68         If we don't have a specific RMr state, then we return the default (e.g.
69         receive failed).
70
71         The addition of the connection shut error code to the switch requires
72         that the NNG version at commit e618abf8f3db2a94269a (or after) be
73         used for compiling RMR.
74 */
75 static inline int xlate_si_state( int state, int def_state ) {
76
77         switch( state ) {
78                 case SI_ERR_NONE:
79                         errno = 0;
80                         state = RMR_OK;
81                         break;
82
83                 default:
84                         state = def_state;
85                         errno = EAGAIN;
86                         break;
87
88                 case SI_ERR_QUEUED:
89                 case SI_ERR_TPORT:
90                 case SI_ERR_UPORT:
91                 case SI_ERR_FORK:
92                 case SI_ERR_HANDLE:
93                 case SI_ERR_SESSID:
94                 case SI_ERR_TP:
95                 case SI_ERR_SHUTD:
96                 case SI_ERR_NOFDS:
97                 case SI_ERR_SIGUSR1:
98                 case SI_ERR_SIGUSR2:
99                 case SI_ERR_DISC:
100                 case SI_ERR_TIMEOUT:
101                 case SI_ERR_ORDREL:
102                 case SI_ERR_SIGALRM:
103                 case SI_ERR_NOMEM:
104                 case SI_ERR_ADDR:
105                         errno  = ENOTSUP;
106                         state = def_state;
107                         break;
108         }
109
110         return state;
111 }
112
113 /*
114         Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
115         this will put in the size such that it is compatable with old versions
116         of RMR (that expect the message size to not be in network byte order)
117         and with new versions that do. See extract function in mt_call_si_static.c
118         for details on what ends up in the buffer.
119 */
120 static inline void insert_mlen( uint32_t len, char* buf ) {
121         uint32_t* blen;                                                 // pointer into buffer where we'll add the len
122
123         blen = (uint32_t *) buf;                                // old systems expect an unconverted integer
124         *blen = len;
125
126         blen++;
127         *blen = htonl( len );                                   // new systems want a converted integer
128
129         memset( &buf[TP_SZFIELD_LEN], 0, 4 );   // clear to prevent future conversion issues
130         buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;   // marker to flag this is generated by a new message
131 }
132
133 /*
134         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
135         a new message struct as well. Size is the size of the zc buffer to allocate (not
136         including our header). If size is 0, then the buffer allocated is the size previously
137         allocated (if msg is !nil) or the default size given at initialisation).
138
139         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
140         the context value is used.
141
142         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
143                 are zero copy, however ONLY the message is zero copy. We now allocate and use
144                 nng messages.
145 */
146 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
147         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
148         uta_mhdr_t*     hdr;                    // convenience pointer
149         int                     tr_len;                 // trace data len (default or override)
150         int*            alen;                   // convenience pointer to set allocated len
151
152         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
153
154         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
155         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
156         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
157
158         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
159                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
160                 if( msg == NULL ) {
161                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
162                         return NULL;                                                            // we used to exit -- that seems wrong
163                 }
164                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
165         } else {                                                                // user message or message from the ring
166                 if( mlen > msg->alloc_len ) {           // current allocation is too small
167                         msg->alloc_len = 0;                             // force tp_buffer realloc below
168                         if( msg->tp_buf ) {
169                                 free( msg->tp_buf );
170                                 msg->tp_buf = NULL;
171                         }
172                 } else {
173                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
174                 }
175         }
176
177         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
178
179         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
180                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
181                 abort( );                                                                                       // toss out a core file for this
182         }
183
184         if( DEBUG ) {
185                 // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
186                 memset( msg->tp_buf, 0, mlen );
187                 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );                // do NOT use a $ in this string!
188         }
189
190         insert_mlen( (uint32_t) mlen, msg->tp_buf );                    // this will likely be overwriten on send to shirnk
191
192         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
193         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
194         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
195                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
196                 hdr->sub_id = htonl( UNSET_SUBID );
197                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
198                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
199                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
200                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
201         }
202         msg->len = 0;                                                                                   // length of data in the payload
203         msg->cookie = 0x4942;
204         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
205         msg->sub_id = UNSET_SUBID;
206         msg->mtype = UNSET_MSGTYPE;
207         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
208         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
209         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
210         msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
211         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
212         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
213         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
214
215         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
216
217         return msg;
218 }
219
220 /*
221         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
222         transport receive should allocate that on its own.
223 */
224 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
225         size_t  mlen;
226         uta_mhdr_t* hdr;                        // convenience pointer
227         rmr_mbuf_t* msg;
228
229         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
230                 if( msg->tp_buf ) {
231                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
232                 }
233         } else {
234                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
235                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
236                         return NULL;                                                    // this used to exit, but that seems wrong
237                 }
238         }
239
240         memset( msg, 0, sizeof( *msg ) );
241
242         msg->cookie = 0x4942;
243         msg->sub_id = UNSET_SUBID;
244         msg->mtype = UNSET_MSGTYPE;
245         msg->tp_buf = NULL;
246         msg->header = NULL;
247         msg->len = -1;                                                                                  // no payload; invalid len
248         msg->alloc_len = 0;
249         msg->payload = NULL;
250         msg->xaction = NULL;
251         msg->state = state;
252         msg->flags = 0;
253         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
254
255         return msg;
256 }
257
258 /*
259         This accepts a message with the assumption that only the tp_buf pointer is valid. It
260         sets all of the various header/payload/xaction pointers in the mbuf to the proper
261         spot in the transport layer buffer.  The len in the header is assumed to be the
262         allocated len (a receive buffer that nng created);
263
264         The alen parm is the assumed allocated length; assumed because it's a value likely
265         to have come from si receive and the actual alloc len might be larger, but we
266         can only assume this is the total usable space. Because we are managing a transport
267         header in the first n bytes of the real msg, we must adjust this length down by the
268         size of the tp header (for testing 50 bytes, but this should change to a struct if
269         we adopt this interface).
270
271         This function returns the message with an error state set if it detects that the
272         received message might have been truncated.  Check is done here as the calculation
273         is somewhat based on header version.
274 */
275 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
276         uta_mhdr_t* hdr = NULL;                 // current header
277         uta_v1mhdr_t* v1hdr;                    // version 1 header
278         int ver;
279         int     hlen;                                           // header len to use for a truncation check
280
281         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
282
283         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
284
285         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
286                 ver = 1;
287                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
288         } else {
289                 ver = ntohl( v1hdr->rmr_ver );
290         }
291
292         switch( ver ) {
293                 case 1:
294                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
295                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
296                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
297
298                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
299                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
300                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
301                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
302                         msg->state = RMR_OK;
303                         hlen = sizeof( uta_v1mhdr_t );
304                         break;
305
306                 default:                                                                                                        // current version always lands here
307                         hdr = (uta_mhdr_t *) msg->header;
308                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
309                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
310
311                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
312                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
313                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
314                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
315                         msg->sub_id = ntohl( hdr->sub_id );
316                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
317                         break;
318         }
319
320         if( msg->len > (msg->alloc_len - hlen ) ) {
321                 msg->state = RMR_ERR_TRUNC;
322                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
323         } else {
324                 msg->state = RMR_OK;
325         }
326 }
327
328 /*
329         This will clone a message into a new zero copy buffer and return the cloned message.
330 */
331 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
332         rmr_mbuf_t* nm;                 // new message buffer
333         size_t  mlen;
334         int state;
335         uta_mhdr_t* hdr;
336         uta_v1mhdr_t* v1hdr;
337
338         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
339         if( nm == NULL ) {
340                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
341                 exit( 1 );
342         }
343         memset( nm, 0, sizeof( *nm ) );
344
345         mlen = old_msg->alloc_len;                                                                              // length allocated before
346         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
347                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
348                 abort();
349         }
350
351         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
352         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
353         switch( ntohl( v1hdr->rmr_ver ) ) {
354                 case 1:
355                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
356                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
357                         break;
358
359                 default:                                                                                        // current message always caught  here
360                         hdr = nm->header;
361                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
362                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
363                         break;
364         }
365
366         // --- these are all version agnostic -----------------------------------
367         nm->mtype = old_msg->mtype;
368         nm->sub_id = old_msg->sub_id;
369         nm->len = old_msg->len;                                                                 // length of data in the payload
370         nm->alloc_len = mlen;                                                                   // length of allocated payload
371
372         nm->xaction = &hdr->xid[0];                                                             // reference xaction
373         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
374         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
375         memcpy( nm->payload, old_msg->payload, old_msg->len );
376
377         return nm;
378 }
379
380 /*
381         This will clone a message with a change to the trace area in the header such that
382         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
383         The orignal message will be left unchanged, and a pointer to the new message is returned.
384         It is not possible to realloc buffers and change the data sizes.
385 */
386 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
387         rmr_mbuf_t* nm;                 // new message buffer
388         size_t  mlen;
389         int state;
390         uta_mhdr_t* hdr;
391         uta_v1mhdr_t* v1hdr;
392         int     tr_old_len;                     // tr size in new buffer
393         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
394         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
395
396         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
397         if( nm == NULL ) {
398                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
399                 exit( 1 );
400         }
401         memset( nm, 0, sizeof( *nm ) );
402
403         hdr = old_msg->header;
404         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
405
406         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
407         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
408
409         tpb_len = mlen + TP_HDR_LEN;
410         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
411                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
412                 exit( 1 );
413         }
414         if( DEBUG ) {
415                 memset( nm->tp_buf, 0, tpb_len );
416                 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );         // DEBUGGING do NOT use $ in this string!!
417         }
418
419         insert_mlen( (uint32_t) tpb_len, nm->tp_buf );                  // this len will likely be reset on send to shrink
420
421         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
422
423         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
424         switch( ntohl( v1hdr->rmr_ver ) ) {
425                 case 1:
426                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
427                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
428                         break;
429
430                 default:                                                                                        // current message version always caught  here
431                         hdr = nm->header;
432                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
433                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
434
435                         if( RMR_D1_LEN( hdr )  ) {
436                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
437                         }
438                         if( RMR_D2_LEN( hdr )  ) {
439                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
440                         }
441
442                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
443                         break;
444         }
445
446         // --- these are all version agnostic -----------------------------------
447         nm->mtype = old_msg->mtype;
448         nm->sub_id = old_msg->sub_id;
449         nm->len = old_msg->len;                                                                 // length of data in the payload
450         nm->alloc_len = mlen;                                                                   // length of allocated payload
451
452         nm->xaction = &hdr->xid[0];                                                             // reference xaction
453         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
454         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
455         memcpy( nm->payload, old_msg->payload, old_msg->len );
456
457         return nm;
458 }
459
460 /*
461         Realloc the message such that the payload is at least payload_len bytes.
462         The clone and copy options affect what portion of the original payload is copied to
463         the reallocated message, and whether or not the original payload is lost after the
464         reallocation process has finished.
465
466                 copy == true
467                 The entire payload from the original message will be coppied to the reallocated
468                 payload.
469
470                 copy == false
471                 Only the header (preserving return to sender information, message type, etc)
472                 is preserved after reallocation; the payload used lengrh is set to 0 and the
473                 payload is NOT initialised/cleared.
474
475                 clone == true
476                 The orignal message is preserved and a completely new message buffer and payload
477                 are allocated (even if the size given is the same). A pointer to the new message
478                 buffer is returned and it is the user application's responsibility to manage the
479                 old buffer (e.g. free when not needed).
480
481                 clone == false
482                 The old payload will be lost after reallocation. The message buffer pointer which
483                 is returned will likely reference the same structure (don't depend on that).
484
485
486         CAUTION:
487         If the message is not a message which was received, the mtype, sub-id, length values in the
488         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
489         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
490         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
491         settings and NOT from the copied RMR header in transport buffer.
492 */
493 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
494         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
495         size_t  mlen;
496         uta_mhdr_t* omhdr;              // old message header
497         int             tr_old_len;             // tr size in new buffer
498         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
499         int             hdr_len = 0;    // length of RMR and transport headers in old msg
500         void*   old_tp_buf;             // pointer to the old tp buffer
501         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
502         int             old_mt;                 // msg type and sub-id from the message passed in
503         int             old_sid;
504         int             old_len;
505         int             old_rfd;                // rts file descriptor from old message
506
507         if( old_msg == NULL || payload_len <= 0 ) {
508                 errno = EINVAL;
509                 return NULL;
510         }
511
512         old_mt = old_msg->mtype;                        // preserve mbuf info
513         old_sid = old_msg->sub_id;
514         old_len = old_msg->len;
515         old_rfd = old_msg->rts_fd;
516
517         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
518
519         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
520                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
521                 return old_msg;
522         }
523
524         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
525         old_tp_buf = old_msg->tp_buf;
526
527         if( clone ) {
528                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
529                 free_tp = 0;
530
531                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
532                 if( nm == NULL ) {
533                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
534                         return NULL;
535                 }
536                 memset( nm, 0, sizeof( *nm ) );
537                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
538         } else {
539                 nm = old_msg;
540         }
541
542         omhdr = old_msg->header;
543         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
544
545         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
546         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
547                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
548                 free( nm );
549                 return NULL;
550         }
551
552         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
553         SET_HDR_LEN( nm->header );
554
555         if( copy ) {                                                                                                                            // if we need to copy the old payload too
556                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
557                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
558         } else {                                                                                                                                        // just need to copy header
559                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
560                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
561         }
562
563         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
564
565         if( !copy ) {
566                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
567                 nm->sub_id = -1;
568                 nm->len = 0;                                            // and len is 0
569         } else {
570                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
571                 nm->mtype = old_mt;
572                 nm->sub_id = old_sid;
573         }
574
575         if( free_tp ) {
576                 free( old_tp_buf );                             // we did not clone, so free b/c no references
577         }
578
579         return nm;
580 }
581
582 /*
583         For SI95 based transport all receives are driven through the threaded
584         ring and thus this function should NOT be called. If it is we will panic
585         and abort straight away.
586 */
587 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
588
589 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
590 exit( 1 );
591
592         return NULL;
593 }
594
595 /*
596         This does the hard work of actually sending the message to the given socket. On success,
597         a new message struct is returned. On error, the original msg is returned with the state
598         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
599         buffer will not be allocated and returned (mostly for call() interal processing since
600         the return message from call() is a received buffer, not a new one).
601
602         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
603         validation has been done prior.
604
605         When msg->state is not ok, this function must set tp_state in the message as some API
606         fucntions return the message directly and do not propigate errno into the message.
607 */
608 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
609         int state;
610         uta_mhdr_t*     hdr;
611         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
612         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
613         int tot_len;                                                    // total send length (hdr + user data + tp header)
614
615         // future: ensure that application did not overrun the XID buffer; last byte must be 0
616
617         hdr = (uta_mhdr_t *) msg->header;
618         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
619         hdr->sub_id = htonl( msg->sub_id );
620         hdr->plen = htonl( msg->len );
621         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
622
623         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
624                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
625                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
626         }
627
628         if( retries == 0 ) {
629                 spin_retries = 100;
630                 retries++;
631         }
632
633         errno = 0;
634         msg->state = RMR_OK;
635         do {
636                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
637                 if( tot_len > msg->alloc_len ) {
638                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
639                 }
640                 insert_mlen( tot_len, msg->tp_buf );    // shrink to fit
641
642                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
643                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
644
645                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
646                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
647                         msg->state = state;
648                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
649                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
650                                         retries--;
651                                         if( retries > 0 ) {                                     // only if we'll loop through again
652                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
653                                         }
654                                         spin_retries = 1000;
655                                 }
656                         } else {
657                                 state = 0;                      // don't loop
658                         }
659                 } else {
660                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
661                         state = 0;
662                         msg->state = RMR_OK;
663                         hdr = NULL;
664                 }
665         } while( state && retries > 0 );
666
667         if( msg->state == RMR_OK ) {                                                                    // successful send
668                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
669                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
670                 } else {
671                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
672                         return NULL;
673                 }
674         } else {                                                                                        // send failed -- return original message
675                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
676                         errno = EAGAIN;
677                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
678                 } else {
679                         msg->state = RMR_ERR_SENDFAILED;
680                 }
681
682                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
683         }
684
685         return msg;
686 }
687
688 /*
689         send message with maximum timeout.
690         Accept a message and send it to an endpoint based on message type.
691         If NNG reports that the send attempt timed out, or should be retried,
692         RMr will retry for approximately max_to microseconds; rounded to the next
693         higher value of 10.
694
695         Allocates a new message buffer for the next send. If a message type has
696         more than one group of endpoints defined, then the message will be sent
697         in round robin fashion to one endpoint in each group.
698
699         An endpoint will be looked up in the route table using the message type and
700         the subscription id. If the subscription id is "UNSET_SUBID", then only the
701         message type is used.  If the initial lookup, with a subid, fails, then a
702         second lookup using just the mtype is tried.
703
704         When msg->state is not OK, this function must set tp_state in the message as
705         some API fucntions return the message directly and do not propigate errno into
706         the message.
707
708         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
709                 it will return with an error and errno set to eagain. If the send is
710                 a limited fanout, then the returned status is the status of the last
711                 send attempt.
712
713 */
714 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
715         endpoint_t*     ep;                                     // end point that we're attempting to send to
716         rtable_ent_t*   rte;                    // the route table entry which matches the message key
717         int     nn_sock;                                        // endpoint socket (fd in si case) for send
718         uta_ctx_t*      ctx;
719         int                     group;                          // selected group to get socket for
720         int                     send_again;                     // true if the message must be sent again
721         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
722         int                     sock_ok;                        // got a valid socket from round robin select
723         char*           d1;
724         int                     ok_sends = 0;           // track number of ok sends
725
726         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
727                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
728                 if( msg != NULL ) {
729                         msg->state = RMR_ERR_BADARG;
730                         errno = EINVAL;                                                                                 // must ensure it's not eagain
731                         msg->tp_state = errno;
732                 }
733                 return msg;
734         }
735
736         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
737         if( msg->header == NULL ) {
738                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
739                 msg->state = RMR_ERR_NOHDR;
740                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
741                 msg->tp_state = errno;
742                 return msg;
743         }
744
745         if( max_to < 0 ) {
746                 max_to = ctx->send_retries;             // convert to retries
747         }
748
749         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
750                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
751                 msg->state = RMR_ERR_NOENDPT;
752                 errno = ENXIO;                                                                          // must ensure it's not eagain
753                 msg->tp_state = errno;
754                 return msg;                                                                                     // caller can resend (maybe) or free
755         }
756
757         send_again = 1;                                                                                 // force loop entry
758         group = 0;                                                                                              // always start with group 0
759         while( send_again ) {
760                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
761                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
762                 } else {
763                         sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
764                         send_again = 0;
765                 }
766
767                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
768                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
769
770                 group++;
771
772                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
773                         if( send_again ) {
774                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
775                                 if( clone_m == NULL ) {
776                                         msg->state = RMR_ERR_SENDFAILED;
777                                         errno = ENOMEM;
778                                         msg->tp_state = errno;
779                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
780                                         return msg;
781                                 }
782
783                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
784                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
785                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
786
787                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
788                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
789                                         msg = clone_m;
790                                 } else {
791                                         ok_sends++;
792                                         msg = clone_m;                                                                          // clone will be the next to send
793                                         msg->state = RMR_OK;
794                                 }
795                         } else {
796                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
797                                 if( DEBUG ) {
798                                         if( msg == NULL ) {
799                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
800                                         }
801                                 }
802                         }
803
804                         if( ep != NULL && msg != NULL ) {
805                                 switch( msg->state ) {
806                                         case RMR_OK:
807                                                 ep->scounts[EPSC_GOOD]++;
808                                                 break;
809         
810                                         case RMR_ERR_RETRY:
811                                                 ep->scounts[EPSC_TRANS]++;
812                                                 break;
813
814                                         default:
815                                                 ep->scounts[EPSC_FAIL]++;
816                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
817                                                 break;
818                                 }
819                         }
820                 } else {
821                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
822                         msg->state = RMR_ERR_NOENDPT;
823                         errno = ENXIO;
824                 }
825         }
826
827         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
828                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
829                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
830                         msg->state = RMR_OK;
831                 }
832
833                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
834
835                 msg->tp_state = errno;
836         }
837
838         return msg;                                                                     // last message caries the status of last/only send attempt
839 }
840
841
842 /*
843         A generic wrapper to the real send to keep wormhole stuff agnostic.
844         We assume the wormhole function vetted the buffer so we don't have to.
845 */
846 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
847         return send_msg( ctx, msg, ep->nn_sock, -1 );
848 }
849
850 #endif