Fix rmr_call() parameter checking bug
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       sr_si_static.c
23         Abstract:       These are static send/receive primatives which (sadly)
24                                 differ based on the underlying protocol (nng vs SI95).
25                                 Split from rmr_nng.c  for easier wormhole support.
26
27         Author:         E. Scott Daniels
28         Date:           13 February 2019
29 */
30
31 #ifndef _sr_si_static_c
32 #define _sr_si_static_c
33
34 static void dump_n( char *p, char* label, int n ) {
35         int i;
36         int j;
37         int t = 0;
38         int     rows;
39
40
41         if( label ) {
42                 fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
43         }
44
45         rows = (n/16) + ((n % 16) ? 1 : 0);
46
47         for( j = 0; j < rows; j++ ) {
48                 fprintf( stderr, "[DUMP] %04x: ", j * 16 );
49
50                 for( i = 0; t < n && i < 16; i++, t++ ) {
51                         fprintf( stderr, "%02x ", (unsigned char) *p );
52                         p++;
53                 }
54                 fprintf( stderr, "\n" );
55         }
56 }
57
58 /*
59         backwards compatability.
60 */
61 static void dump_40( char *p, char* label ) {
62         dump_n( p, label, 40 );
63 }
64
65 /*
66         Translates the nng state passed in to one of ours that is suitable to put
67         into the message, and sets errno to something that might be useful.
68         If we don't have a specific RMr state, then we return the default (e.g.
69         receive failed).
70
71         The addition of the connection shut error code to the switch requires
72         that the NNG version at commit e618abf8f3db2a94269a (or after) be
73         used for compiling RMR.
74 */
75 static inline int xlate_si_state( int state, int def_state ) {
76
77         switch( state ) {
78                 case SI_ERR_NONE:
79                         errno = 0;
80                         state = RMR_OK;
81                         break;
82
83                 default:
84                         state = def_state;
85                         errno = EAGAIN;
86                         break;
87
88                 case SI_ERR_QUEUED:
89                 case SI_ERR_TPORT:
90                 case SI_ERR_UPORT:
91                 case SI_ERR_FORK:
92                 case SI_ERR_HANDLE:
93                 case SI_ERR_SESSID:
94                 case SI_ERR_TP:
95                 case SI_ERR_SHUTD:
96                 case SI_ERR_NOFDS:
97                 case SI_ERR_SIGUSR1:
98                 case SI_ERR_SIGUSR2:
99                 case SI_ERR_DISC:
100                 case SI_ERR_TIMEOUT:
101                 case SI_ERR_ORDREL:
102                 case SI_ERR_SIGALRM:
103                 case SI_ERR_NOMEM:
104                 case SI_ERR_ADDR:
105                         errno  = ENOTSUP;
106                         state = def_state;
107                         break;
108         }
109
110         return state;
111 }
112
113 /*
114         Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
115         this will put in the size such that it is compatable with old versions
116         of RMR (that expect the message size to not be in network byte order)
117         and with new versions that do. See extract function in mt_call_si_static.c
118         for details on what ends up in the buffer.
119 */
120 static inline void insert_mlen( uint32_t len, char* buf ) {
121         uint32_t* blen;                                                 // pointer into buffer where we'll add the len
122
123         blen = (uint32_t *) buf;                                // old systems expect an unconverted integer
124         *blen = len;
125
126         blen++;
127         *blen = htonl( len );                                   // new systems want a converted integer
128
129         buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;   // marker to flag this is generated by a new message
130 }
131
132 /*
133         Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
134         a new message struct as well. Size is the size of the zc buffer to allocate (not
135         including our header). If size is 0, then the buffer allocated is the size previously
136         allocated (if msg is !nil) or the default size given at initialisation).
137
138         The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
139         the context value is used.
140
141         NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
142                 are zero copy, however ONLY the message is zero copy. We now allocate and use
143                 nng messages.
144 */
145 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
146         size_t          mlen = -1;                      // size of the transport buffer that we'll allocate
147         uta_mhdr_t*     hdr;                    // convenience pointer
148         int                     tr_len;                 // trace data len (default or override)
149         int*            alen;                   // convenience pointer to set allocated len
150
151         tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
152
153         mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
154         mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
155         mlen = sizeof( char ) * (mlen + TP_HDR_LEN);                                            // finally add the transport header len
156
157         if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
158                 msg = (rmr_mbuf_t *) malloc( sizeof *msg );
159                 if( msg == NULL ) {
160                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
161                         return NULL;                                                            // we used to exit -- that seems wrong
162                 }
163                 memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
164         } else {                                                                // user message or message from the ring
165                 if( mlen > msg->alloc_len ) {           // current allocation is too small
166                         msg->alloc_len = 0;                             // force tp_buffer realloc below
167                         if( msg->tp_buf ) {
168                                 free( msg->tp_buf );
169                                 msg->tp_buf = NULL;
170                         }
171                 } else {
172                         mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
173                 }
174         }
175
176         msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
177
178         if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
179                 rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
180                 abort( );                                                                                       // toss out a core file for this
181         }
182
183         if( DEBUG ) {
184                 // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
185                 memset( msg->tp_buf, 0, mlen );
186                 memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );                // do NOT use a $ in this string!
187         }
188
189         insert_mlen( (uint32_t) mlen, msg->tp_buf );                    // this will likely be overwriten on send to shirnk
190
191         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
192         memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
193         if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
194                 hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
195                 hdr->sub_id = htonl( UNSET_SUBID );
196                 SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
197                 SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
198                 SET_HDR_D1_LEN( hdr, ctx->d1_len );
199                 //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
200         }
201         msg->len = 0;                                                                                   // length of data in the payload
202         msg->cookie = 0x4942;
203         msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
204         msg->sub_id = UNSET_SUBID;
205         msg->mtype = UNSET_MSGTYPE;
206         msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
207         msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
208         msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
209         msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
210         msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
211         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
212         strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
213
214         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
215
216         return msg;
217 }
218
219 /*
220         Allocates only the mbuf and does NOT allocate an underlying transport buffer since
221         transport receive should allocate that on its own.
222 */
223 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
224         size_t  mlen;
225         uta_mhdr_t* hdr;                        // convenience pointer
226         rmr_mbuf_t* msg;
227
228         if( (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) != NULL ) {
229                 if( msg->tp_buf ) {
230                         free( msg->tp_buf );            // caller doesn't want it -- future put this on an accumulation ring
231                 }
232         } else {
233                 if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
234                         rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
235                         return NULL;                                                    // this used to exit, but that seems wrong
236                 }
237         }
238
239         memset( msg, 0, sizeof( *msg ) );
240
241         msg->cookie = 0x4942;
242         msg->sub_id = UNSET_SUBID;
243         msg->mtype = UNSET_MSGTYPE;
244         msg->tp_buf = NULL;
245         msg->header = NULL;
246         msg->len = -1;                                                                                  // no payload; invalid len
247         msg->alloc_len = 0;
248         msg->payload = NULL;
249         msg->xaction = NULL;
250         msg->state = state;
251         msg->flags = 0;
252         msg->ring = ctx->zcb_mring;                                                     // original msg_free() api doesn't get context so must dup on eaach :(
253
254         return msg;
255 }
256
257 /*
258         This accepts a message with the assumption that only the tp_buf pointer is valid. It
259         sets all of the various header/payload/xaction pointers in the mbuf to the proper
260         spot in the transport layer buffer.  The len in the header is assumed to be the
261         allocated len (a receive buffer that nng created);
262
263         The alen parm is the assumed allocated length; assumed because it's a value likely
264         to have come from si receive and the actual alloc len might be larger, but we
265         can only assume this is the total usable space. Because we are managing a transport
266         header in the first n bytes of the real msg, we must adjust this length down by the
267         size of the tp header (for testing 50 bytes, but this should change to a struct if
268         we adopt this interface).
269
270         This function returns the message with an error state set if it detects that the
271         received message might have been truncated.  Check is done here as the calculation
272         is somewhat based on header version.
273 */
274 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
275         uta_mhdr_t* hdr = NULL;                 // current header
276         uta_v1mhdr_t* v1hdr;                    // version 1 header
277         int ver;
278         int     hlen;                                           // header len to use for a truncation check
279
280         msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
281
282         v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
283
284         if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
285                 ver = 1;
286                 v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
287         } else {
288                 ver = ntohl( v1hdr->rmr_ver );
289         }
290
291         switch( ver ) {
292                 case 1:
293                         msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
294                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
295                         msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
296
297                         msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
298                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
299                         msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
300                         msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
301                         msg->state = RMR_OK;
302                         hlen = sizeof( uta_v1mhdr_t );
303                         break;
304
305                 default:                                                                                                        // current version always lands here
306                         hdr = (uta_mhdr_t *) msg->header;
307                         msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
308                         msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
309
310                         msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
311                         msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
312                         msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
313                         msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
314                         msg->sub_id = ntohl( hdr->sub_id );
315                         hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
316                         break;
317         }
318
319         if( msg->len > (msg->alloc_len - hlen ) ) {
320                 msg->state = RMR_ERR_TRUNC;
321                 msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
322         } else {
323                 msg->state = RMR_OK;
324         }
325 }
326
327 /*
328         This will clone a message into a new zero copy buffer and return the cloned message.
329 */
330 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
331         rmr_mbuf_t* nm;                 // new message buffer
332         size_t  mlen;
333         int state;
334         uta_mhdr_t* hdr;
335         uta_v1mhdr_t* v1hdr;
336
337         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
338         if( nm == NULL ) {
339                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
340                 exit( 1 );
341         }
342         memset( nm, 0, sizeof( *nm ) );
343
344         mlen = old_msg->alloc_len;                                                                              // length allocated before
345         if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
346                 rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
347                 abort();
348         }
349
350         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
351         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
352         switch( ntohl( v1hdr->rmr_ver ) ) {
353                 case 1:
354                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
355                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
356                         break;
357
358                 default:                                                                                        // current message always caught  here
359                         hdr = nm->header;
360                         memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
361                         nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
362                         break;
363         }
364
365         // --- these are all version agnostic -----------------------------------
366         nm->mtype = old_msg->mtype;
367         nm->sub_id = old_msg->sub_id;
368         nm->len = old_msg->len;                                                                 // length of data in the payload
369         nm->alloc_len = mlen;                                                                   // length of allocated payload
370
371         nm->xaction = &hdr->xid[0];                                                             // reference xaction
372         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
373         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
374         memcpy( nm->payload, old_msg->payload, old_msg->len );
375
376         return nm;
377 }
378
379 /*
380         This will clone a message with a change to the trace area in the header such that
381         it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
382         The orignal message will be left unchanged, and a pointer to the new message is returned.
383         It is not possible to realloc buffers and change the data sizes.
384 */
385 static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
386         rmr_mbuf_t* nm;                 // new message buffer
387         size_t  mlen;
388         int state;
389         uta_mhdr_t* hdr;
390         uta_v1mhdr_t* v1hdr;
391         int     tr_old_len;                     // tr size in new buffer
392         int*    alen;                   // convenience pointer to set toal xmit len FIX ME!
393         int             tpb_len;                // total transmit buffer len (user space, rmr header and tp header)
394
395         nm = (rmr_mbuf_t *) malloc( sizeof *nm );
396         if( nm == NULL ) {
397                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
398                 exit( 1 );
399         }
400         memset( nm, 0, sizeof( *nm ) );
401
402         hdr = old_msg->header;
403         tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
404
405         mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
406         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
407
408         tpb_len = mlen + TP_HDR_LEN;
409         if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
410                 rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
411                 exit( 1 );
412         }
413         if( DEBUG ) {
414                 memset( nm->tp_buf, 0, tpb_len );
415                 memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );         // DEBUGGING do NOT use $ in this string!!
416         }
417
418         insert_mlen( (uint32_t) tpb_len, nm->tp_buf );                  // this len will likely be reset on send to shrink
419
420         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
421
422         v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
423         switch( ntohl( v1hdr->rmr_ver ) ) {
424                 case 1:
425                         memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
426                         nm->payload = (void *) v1hdr + sizeof( *v1hdr );
427                         break;
428
429                 default:                                                                                        // current message version always caught  here
430                         hdr = nm->header;
431                         memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
432                         SET_HDR_TR_LEN( hdr, tr_len );                                                          // must adjust trace len in new message before copy
433
434                         if( RMR_D1_LEN( hdr )  ) {
435                                 memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) );           // copy data1 and data2 if necessary
436                         }
437                         if( RMR_D2_LEN( hdr )  ) {
438                                 memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
439                         }
440
441                         nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
442                         break;
443         }
444
445         // --- these are all version agnostic -----------------------------------
446         nm->mtype = old_msg->mtype;
447         nm->sub_id = old_msg->sub_id;
448         nm->len = old_msg->len;                                                                 // length of data in the payload
449         nm->alloc_len = mlen;                                                                   // length of allocated payload
450
451         nm->xaction = &hdr->xid[0];                                                             // reference xaction
452         nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
453         nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
454         memcpy( nm->payload, old_msg->payload, old_msg->len );
455
456         return nm;
457 }
458
459 /*
460         Realloc the message such that the payload is at least payload_len bytes.
461         The clone and copy options affect what portion of the original payload is copied to
462         the reallocated message, and whether or not the original payload is lost after the
463         reallocation process has finished.
464
465                 copy == true
466                 The entire payload from the original message will be coppied to the reallocated
467                 payload.
468
469                 copy == false
470                 Only the header (preserving return to sender information, message type, etc)
471                 is preserved after reallocation; the payload used lengrh is set to 0 and the
472                 payload is NOT initialised/cleared.
473
474                 clone == true
475                 The orignal message is preserved and a completely new message buffer and payload
476                 are allocated (even if the size given is the same). A pointer to the new message
477                 buffer is returned and it is the user application's responsibility to manage the
478                 old buffer (e.g. free when not needed).
479
480                 clone == false
481                 The old payload will be lost after reallocation. The message buffer pointer which
482                 is returned will likely reference the same structure (don't depend on that).
483
484
485         CAUTION:
486         If the message is not a message which was received, the mtype, sub-id, length values in the
487         RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
488         mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
489         resetting information in the mbuffer, this funciton will reset the mbuf values from the current
490         settings and NOT from the copied RMR header in transport buffer.
491 */
492 static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
493         rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
494         size_t  mlen;
495         uta_mhdr_t* omhdr;              // old message header
496         int             tr_old_len;             // tr size in new buffer
497         int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
498         int             hdr_len = 0;    // length of RMR and transport headers in old msg
499         void*   old_tp_buf;             // pointer to the old tp buffer
500         int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
501         int             old_mt;                 // msg type and sub-id from the message passed in
502         int             old_sid;
503         int             old_len;
504         int             old_rfd;                // rts file descriptor from old message
505
506         if( old_msg == NULL || payload_len <= 0 ) {
507                 errno = EINVAL;
508                 return NULL;
509         }
510
511         old_mt = old_msg->mtype;                        // preserve mbuf info
512         old_sid = old_msg->sub_id;
513         old_len = old_msg->len;
514         old_rfd = old_msg->rts_fd;
515
516         old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
517
518         if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
519                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
520                 return old_msg;
521         }
522
523         hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
524         old_tp_buf = old_msg->tp_buf;
525
526         if( clone ) {
527                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
528                 free_tp = 0;
529
530                 nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
531                 if( nm == NULL ) {
532                         rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
533                         return NULL;
534                 }
535                 memset( nm, 0, sizeof( *nm ) );
536                 nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
537         } else {
538                 nm = old_msg;
539         }
540
541         omhdr = old_msg->header;
542         mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
543
544         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
545         if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
546                 rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
547                 free( nm );
548                 return NULL;
549         }
550
551         nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
552         SET_HDR_LEN( nm->header );
553
554         if( copy ) {                                                                                                                            // if we need to copy the old payload too
555                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
556                 memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
557         } else {                                                                                                                                        // just need to copy header
558                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
559                 memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
560         }
561
562         ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
563
564         if( !copy ) {
565                 nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
566                 nm->sub_id = -1;
567                 nm->len = 0;                                            // and len is 0
568         } else {
569                 nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
570                 nm->mtype = old_mt;
571                 nm->sub_id = old_sid;
572         }
573
574         if( free_tp ) {
575                 free( old_tp_buf );                             // we did not clone, so free b/c no references
576         }
577
578         return nm;
579 }
580
581 /*
582         For SI95 based transport all receives are driven through the threaded
583         ring and thus this function should NOT be called. If it is we will panic
584         and abort straight away.
585 */
586 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
587
588 fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
589 exit( 1 );
590
591         return NULL;
592 }
593
594 /*
595         This does the hard work of actually sending the message to the given socket. On success,
596         a new message struct is returned. On error, the original msg is returned with the state
597         set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
598         buffer will not be allocated and returned (mostly for call() interal processing since
599         the return message from call() is a received buffer, not a new one).
600
601         Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
602         validation has been done prior.
603
604         When msg->state is not ok, this function must set tp_state in the message as some API
605         fucntions return the message directly and do not propigate errno into the message.
606 */
607 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
608         int state;
609         uta_mhdr_t*     hdr;
610         int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
611         int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
612         int tot_len;                                                    // total send length (hdr + user data + tp header)
613
614         // future: ensure that application did not overrun the XID buffer; last byte must be 0
615
616         hdr = (uta_mhdr_t *) msg->header;
617         hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
618         hdr->sub_id = htonl( msg->sub_id );
619         hdr->plen = htonl( msg->len );
620         tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
621
622         if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
623                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
624                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
625         }
626
627         if( retries == 0 ) {
628                 spin_retries = 100;
629                 retries++;
630         }
631
632         errno = 0;
633         msg->state = RMR_OK;
634         do {
635                 tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
636                 if( tot_len > msg->alloc_len ) {
637                         tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
638                 }
639                 insert_mlen( tot_len, msg->tp_buf );    // shrink to fit
640
641                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
642                 if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
643
644                 if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
645                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
646                         msg->state = state;
647                         if( retries > 0 && state == SI_ERR_BLOCKED ) {
648                                 if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
649                                         retries--;
650                                         if( retries > 0 ) {                                     // only if we'll loop through again
651                                                 usleep( 1 );                                    // sigh, give up the cpu and hope it's just 1 miscrosec
652                                         }
653                                         spin_retries = 1000;
654                                 }
655                         } else {
656                                 state = 0;                      // don't loop
657                         }
658                 } else {
659                         if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
660                         state = 0;
661                         msg->state = RMR_OK;
662                         hdr = NULL;
663                 }
664         } while( state && retries > 0 );
665
666         if( msg->state == RMR_OK ) {                                                                    // successful send
667                 if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
668                         return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
669                 } else {
670                         rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
671                         return NULL;
672                 }
673         } else {                                                                                        // send failed -- return original message
674                 if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
675                         errno = EAGAIN;
676                         msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
677                 } else {
678                         msg->state = RMR_ERR_SENDFAILED;
679                 }
680
681                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
682         }
683
684         return msg;
685 }
686
687 /*
688         send message with maximum timeout.
689         Accept a message and send it to an endpoint based on message type.
690         If NNG reports that the send attempt timed out, or should be retried,
691         RMr will retry for approximately max_to microseconds; rounded to the next
692         higher value of 10.
693
694         Allocates a new message buffer for the next send. If a message type has
695         more than one group of endpoints defined, then the message will be sent
696         in round robin fashion to one endpoint in each group.
697
698         An endpoint will be looked up in the route table using the message type and
699         the subscription id. If the subscription id is "UNSET_SUBID", then only the
700         message type is used.  If the initial lookup, with a subid, fails, then a
701         second lookup using just the mtype is tried.
702
703         When msg->state is not OK, this function must set tp_state in the message as
704         some API fucntions return the message directly and do not propigate errno into
705         the message.
706
707         CAUTION: this is a non-blocking send.  If the message cannot be sent, then
708                 it will return with an error and errno set to eagain. If the send is
709                 a limited fanout, then the returned status is the status of the last
710                 send attempt.
711
712 */
713 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
714         endpoint_t*     ep;                                     // end point that we're attempting to send to
715         rtable_ent_t*   rte;                    // the route table entry which matches the message key
716         int     nn_sock;                                        // endpoint socket (fd in si case) for send
717         uta_ctx_t*      ctx;
718         int                     group;                          // selected group to get socket for
719         int                     send_again;                     // true if the message must be sent again
720         rmr_mbuf_t*     clone_m;                        // cloned message for an nth send
721         int                     sock_ok;                        // got a valid socket from round robin select
722         char*           d1;
723         int                     ok_sends = 0;           // track number of ok sends
724
725         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
726                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
727                 if( msg != NULL ) {
728                         msg->state = RMR_ERR_BADARG;
729                         errno = EINVAL;                                                                                 // must ensure it's not eagain
730                         msg->tp_state = errno;
731                 }
732                 return msg;
733         }
734
735         errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
736         if( msg->header == NULL ) {
737                 fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
738                 msg->state = RMR_ERR_NOHDR;
739                 errno = EBADMSG;                                                                                        // must ensure it's not eagain
740                 msg->tp_state = errno;
741                 return msg;
742         }
743
744         if( max_to < 0 ) {
745                 max_to = ctx->send_retries;             // convert to retries
746         }
747
748         if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
749                 rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
750                 msg->state = RMR_ERR_NOENDPT;
751                 errno = ENXIO;                                                                          // must ensure it's not eagain
752                 msg->tp_state = errno;
753                 return msg;                                                                                     // caller can resend (maybe) or free
754         }
755
756         send_again = 1;                                                                                 // force loop entry
757         group = 0;                                                                                              // always start with group 0
758         while( send_again ) {
759                 if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
760                         sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
761                 } else {
762                         sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
763                         send_again = 0;
764                 }
765
766                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
767                                 msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
768
769                 group++;
770
771                 if( sock_ok ) {                                                                                                 // with an rte we _should_ always have a socket, but don't bet on it
772                         if( send_again ) {
773                                 clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
774                                 if( clone_m == NULL ) {
775                                         msg->state = RMR_ERR_SENDFAILED;
776                                         errno = ENOMEM;
777                                         msg->tp_state = errno;
778                                         rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
779                                         return msg;
780                                 }
781
782                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
783                                 msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
784                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
785
786                                 if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
787                                         rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
788                                         msg = clone_m;
789                                 } else {
790                                         ok_sends++;
791                                         msg = clone_m;                                                                          // clone will be the next to send
792                                         msg->state = RMR_OK;
793                                 }
794                         } else {
795                                 msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
796                                 if( DEBUG ) {
797                                         if( msg == NULL ) {
798                                                 rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
799                                         }
800                                 }
801                         }
802
803                         if( ep != NULL && msg != NULL ) {
804                                 switch( msg->state ) {
805                                         case RMR_OK:
806                                                 ep->scounts[EPSC_GOOD]++;
807                                                 break;
808         
809                                         case RMR_ERR_RETRY:
810                                                 ep->scounts[EPSC_TRANS]++;
811                                                 break;
812
813                                         default:
814                                                 ep->scounts[EPSC_FAIL]++;
815                                                 uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
816                                                 break;
817                                 }
818                         }
819                 } else {
820                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
821                         msg->state = RMR_ERR_NOENDPT;
822                         errno = ENXIO;
823                 }
824         }
825
826         if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
827                 msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
828                 if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
829                         msg->state = RMR_OK;
830                 }
831
832                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
833
834                 msg->tp_state = errno;
835         }
836
837         return msg;                                                                     // last message caries the status of last/only send attempt
838 }
839
840
841 /*
842         A generic wrapper to the real send to keep wormhole stuff agnostic.
843         We assume the wormhole function vetted the buffer so we don't have to.
844 */
845 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg ) {
846         return send_msg( ctx, msg, ep->nn_sock, -1 );
847 }
848
849 #endif