Address complaints by code scanner
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "rtc_static.c"                         // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c"                       // send/receive static functions
73 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
76
77
78 //------------------------------------------------------------------------------
79
80
81 /*
82         Clean up a context.
83 */
84 static void free_ctx( uta_ctx_t* ctx ) {
85         if( ctx && ctx->rtg_addr ) {
86                 free( ctx->rtg_addr );
87         }
88 }
89
90 // --------------- public functions --------------------------------------------------------------------------
91
92 /*
93         Returns the size of the payload (bytes) that the msg buffer references.
94         Len in a message is the number of bytes which were received, or should
95         be transmitted, however, it is possible that the mbuf was allocated
96         with a larger payload space than the payload length indicates; this
97         function returns the absolute maximum space that the user has available
98         in the payload. On error (bad msg buffer) -1 is returned and errno should
99         indicate the rason.
100
101         The allocated len stored in the msg is:
102                 transport header length +
103                 message header +
104                 user requested payload
105
106         The msg header is a combination of the fixed RMR header and the variable
107         trace data and d2 fields which may vary for each message.
108 */
109 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
110         if( msg == NULL || msg->header == NULL ) {
111                 errno = EINVAL;
112                 return -1;
113         }
114
115         errno = 0;
116         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
117 }
118
119 /*
120         Allocates a send message as a zerocopy message allowing the underlying message protocol
121         to send the buffer without copy.
122 */
123 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
124         uta_ctx_t*      ctx;
125         rmr_mbuf_t*     m;
126
127         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
128                 return NULL;
129         }
130
131         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
132         return  m;
133 }
134
135
136 /*
137         Allocates a send message as a zerocopy message allowing the underlying message protocol
138         to send the buffer without copy. In addition, a trace data field of tr_size will be
139         added and the supplied data coppied to the buffer before returning the message to
140         the caller.
141 */
142 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
143         uta_ctx_t*      ctx;
144         rmr_mbuf_t*     m;
145         int state;
146
147         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
148                 return NULL;
149         }
150
151         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
152         if( m != NULL ) {
153                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
154                 if( state != tr_size ) {
155                         m->state = RMR_ERR_INITFAILED;
156                 }
157         }
158
159         return  m;
160 }
161
162 /*
163         This provides an external path to the realloc static function as it's called by an
164         outward facing mbuf api function. Used to reallocate a message with a different
165         trace data size.
166 */
167 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
168         return realloc_msg( msg, new_tr_size );
169 }
170
171
172 /*
173         Return the message to the available pool, or free it outright.
174 */
175 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
176         if( mbuf == NULL ) {
177                 return;
178         }
179
180         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
181                 ! mbuf->ring ||                                                                         // cant cache if no ring
182                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
183
184                 if( mbuf->tp_buf ) {
185                         free( mbuf->tp_buf );
186                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
187                 }
188
189                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
190                 free( mbuf );
191         }
192 }
193
194 /*
195         This is a wrapper to the real timeout send. We must wrap it now to ensure that
196         the call flag and call-id are reset
197 */
198 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
199         char* d1;                                                                                                                       // point at the call-id in the header
200
201         if( msg != NULL ) {
202                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
203
204                 d1 = DATA1_ADDR( msg->header );
205                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
206         }
207
208         return mtosend_msg( vctx, msg, max_to );
209 }
210
211 /*
212         Send with default max timeout as is set in the context.
213         See rmr_mtosend_msg() for more details on the parameters.
214         See rmr_stimeout() for info on setting the default timeout.
215 */
216 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
217         char* d1;                                                                                                               // point at the call-id in the header
218
219         if( msg != NULL ) {
220                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
221
222                 d1 = DATA1_ADDR( msg->header );
223                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
224         }
225
226         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
227 }
228
229 /*
230         Return to sender allows a message to be sent back to the endpoint where it originated.
231
232         With SI95 it was thought that the return to sender would be along the same open conneciton
233         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
234         applications sending at high message rates, returning responses on the same connection
235         causes major strife. Thus the decision was made to use the same method as NNG and just
236         open a second connection for reverse path.
237
238         We will attempt to use the name in the received message to look up the endpoint. If
239         that failes, then we will write on the connection that the message arrived on as a
240         falback.
241
242         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
243         and on error it can be passed back to this function to retry the send if desired. On error,
244         errno will liklely have the failure reason set by the nng send processing.  The following
245         are possible values for the state in the message buffer:
246
247         Message states returned:
248                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
249                 RMR_ERR_NOHDR  - message did not have a header
250                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
251                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
252                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
253
254         A nil message as the return value is rare, and generally indicates some kind of horrible
255         failure. The value of errno might give a clue as to what is wrong.
256
257         CAUTION:
258                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
259                 The caller must check for this and handle it properly.
260 */
261 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
262         int                     nn_sock;                        // endpoint socket for send
263         uta_ctx_t*      ctx;
264         char*           hold_src;                       // we need the original source if send fails
265         char*           hold_ip;                        // also must hold original ip
266         int                     sock_ok = 0;            // true if we found a valid endpoint socket
267         endpoint_t*     ep = NULL;                      // end point to track counts
268
269         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
270                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
271                 if( msg != NULL ) {
272                         msg->state = RMR_ERR_BADARG;
273                         msg->tp_state = errno;
274                 }
275                 return msg;
276         }
277
278         errno = 0;                                                                                                              // at this point any bad state is in msg returned
279         if( msg->header == NULL ) {
280                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
281                 msg->state = RMR_ERR_NOHDR;
282                 msg->tp_state = errno;
283                 return msg;
284         }
285
286         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
287
288         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
289         if( ! sock_ok ) {
290                 if( (nn_sock = msg->rts_fd) < 0 ) {
291                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
292                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
293                         }
294                         if( ! sock_ok ) {
295                                 msg->state = RMR_ERR_NOENDPT;
296                                 return msg;
297                         }
298                 }
299         }
300
301
302         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
303         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
304         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
305         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
306         msg = send_msg( ctx, msg, nn_sock, -1 );
307         if( msg ) {
308                 if( ep != NULL ) {
309                         switch( msg->state ) {
310                                 case RMR_OK:
311                                         ep->scounts[EPSC_GOOD]++;
312                                         break;
313
314                                 case RMR_ERR_RETRY:
315                                         ep->scounts[EPSC_TRANS]++;
316                                         break;
317
318                                 default:
319                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
320                                         ep->scounts[EPSC_FAIL]++;
321                                         break;
322                         }
323                 }
324                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
325                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
326                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
327         }
328
329         free( hold_src );
330         free( hold_ip );
331         return msg;
332 }
333
334 /*
335         If multi-threading call is turned on, this invokes that mechanism with the special call
336         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
337         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
338         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
339         a flexible timeout.
340
341         On timeout this function will return a nil pointer. If the original message could not
342         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
343
344         Original behavour:
345         Call sends the message based on message routing using the message type, and waits for a
346         response message to arrive with the same transaction id that was in the outgoing message.
347         If, while wiating for the expected response,  messages are received which do not have the
348         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
349         order that they were received.
350
351         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
352         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
353         may be resent (likely the context pointer was nil).  If the message is sent, but no
354         response is received, a nil message is returned with errno set to indicate the likley
355         issue:
356                 ETIMEDOUT -- too many messages were queued before reciving the expected response
357                 ENOBUFS -- the queued message ring is full, messages were dropped
358                 EINVAL  -- A parameter was not valid
359                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
360                                         user should call this function with the message again.
361
362 */
363 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
364         uta_ctx_t*              ctx;
365
366         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
367                 if( msg != NULL ) {
368                         msg->state = RMR_ERR_BADARG;
369                 }
370                 return msg;
371         }
372
373     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
374 }
375
376 /*
377         The outward facing receive function. When invoked it will pop the oldest message
378         from the receive ring, if any are queued, and return it. If the ring is empty
379         then the receive function is invoked to wait for the next message to arrive (blocking).
380
381         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
382         nil, a new one will be allocated. However, the caller should NOT expect to get the same
383         struct back (if a queued message is returned the message struct will be different).
384 */
385 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
386         uta_ctx_t*      ctx;
387         rmr_mbuf_t*     qm;                             // message that was queued on the ring
388
389         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
390                 errno = EINVAL;
391                 if( old_msg != NULL ) {
392                         old_msg->state = RMR_ERR_BADARG;
393                         old_msg->tp_state = errno;
394                 }
395                 return old_msg;
396         }
397         errno = 0;
398
399         return rmr_mt_rcv( ctx, old_msg, -1 );
400 }
401
402 /*
403         This allows a timeout based receive for applications unable to implement epoll_wait()
404         (e.g. wrappers).
405 */
406 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
407         uta_ctx_t*      ctx;
408
409         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
410                 errno = EINVAL;
411                 if( old_msg != NULL ) {
412                         old_msg->state = RMR_ERR_BADARG;
413                         old_msg->tp_state = errno;
414                 }
415                 return old_msg;
416         }
417
418         return rmr_mt_rcv( ctx, old_msg, ms_to );
419 }
420
421 /*
422         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
423                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
424                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
425
426         This blocks until the message with the 'expect' ID is received. Messages which are received
427         before the expected message are queued onto the message ring.  The function will return
428         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
429         expected message is received. If the queued message ring fills a nil pointer is returned
430         and errno is set to ENOBUFS.
431
432         Generally this will be invoked only by the call() function as it waits for a response, but
433         it is exposed to the user application as three is no reason not to.
434 */
435 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
436         uta_ctx_t*      ctx;
437         int     queued = 0;                             // number we pushed into the ring
438         int     exp_len = 0;                    // length of expected ID
439
440         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
441                 errno = EINVAL;
442                 if( msg != NULL ) {
443                         msg->state = RMR_ERR_BADARG;
444                         msg->tp_state = errno;
445                 }
446                 return msg;
447         }
448
449         errno = 0;
450
451         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
452                 return rmr_rcv_msg( ctx, msg );
453         }
454
455         exp_len = strlen( expect );
456         if( exp_len > RMR_MAX_XID ) {
457                 exp_len = RMR_MAX_XID;
458         }
459         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
460
461         while( queued < allow2queue ) {
462                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
463                 if( msg != NULL ) {
464                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
465                         if( msg->state == RMR_OK ) {
466                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
467                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
468                                         return msg;
469                                 }
470
471                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
472                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
473                                         errno = ENOBUFS;
474                                         return NULL;
475                                 }
476
477                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
478                                 queued++;
479                                 msg = NULL;
480                         }
481                 }
482         }
483
484         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
485         errno = ETIMEDOUT;
486         return NULL;
487 }
488
489 /*
490         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
491         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
492         mechnism indicates eagain or etimeedout.  All other error conditions are reported
493         without this delay. Setting a timeout of 0 causes no retries to be attempted in
494         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
495         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
496         after every 1K send attempts until the "time" value is reached. Retries are abandoned
497         if NNG returns anything other than EAGAIN or EINTER is returned.
498
499         The default, if this function is not used, is 1; meaning that RMr will retry, but will
500         not enter a sleep.  In all cases the caller should check the status in the message returned
501         after a send call.
502
503         Returns -1 if the context was invalid; RMR_OK otherwise.
504 */
505 extern int rmr_set_stimeout( void* vctx, int time ) {
506         uta_ctx_t*      ctx;
507
508         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
509                 return -1;
510         }
511
512         if( time < 0 ) {
513                 time = 0;
514         }
515
516         ctx->send_retries = time;
517         return RMR_OK;
518 }
519
520 /*
521         Set receive timeout -- not supported in nng implementation
522
523         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
524 */
525 extern int rmr_set_rtimeout( void* vctx, int time ) {
526         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
527         return 0;
528 }
529
530
531 /*
532         This is the actual init workhorse. The user visible function meerly ensures that the
533         calling programme does NOT set any internal flags that are supported, and then
534         invokes this.  Internal functions (the route table collector) which need additional
535         open ports without starting additional route table collectors, will invoke this
536         directly with the proper flag.
537
538         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
539                                 that we know about. The _user_ should ensure that the supplied length also
540                                 includes the trace data length maximum as they are in control of that.
541 */
542 static void* init(  char* uproto_port, int def_msg_size, int flags ) {
543         static  int announced = 0;
544         uta_ctx_t*      ctx = NULL;
545         char    bind_info[256];                         // bind info
546         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
547         char*   port;                                           // pointer into the proto_port buffer at the port value
548         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
549         char*   proto_port;
550         char    wbuf[1024];                                     // work buffer
551         char*   tok;                                            // pointer at token in a buffer
552         char*   tok2;
553         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
554         int             state;
555         int             i;
556         int             old_vlevel;
557
558         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
559
560         if( ! announced ) {
561                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
562                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
563                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
564                 announced = 1;
565
566                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
567                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
568         }
569
570         errno = 0;
571         if( uproto_port == NULL ) {
572                 proto_port = strdup( DEF_COMM_PORT );
573         } else {
574                 proto_port = strdup( uproto_port );             // so we can modify it
575         }
576
577         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
578                 errno = ENOMEM;
579                 return NULL;
580         }
581         memset( ctx, 0, sizeof( uta_ctx_t ) );
582
583         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
584         ctx->nrivers = 256;                                                             // number of input flows we'll manage
585         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
586         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
587         for( i = 0; i < ctx->nrivers; i++ ) {
588                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
589         }
590
591         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
592         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
593         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
594         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
595
596         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
597         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
598
599         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
600                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
601                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
602         } else {
603                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
604         }
605         init_mtcall( ctx );                                                             // set up call chutes
606         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
607
608
609         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
610         if( def_msg_size > 0 ) {
611                 ctx->max_plen = def_msg_size;
612         }
613
614         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
615         if( ctx->si_ctx == NULL ) {
616                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
617                 free_ctx( ctx );
618                 return NULL;
619         }
620
621         if( (port = strchr( proto_port, ':' )) != NULL ) {
622                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
623                         port++;
624                 } else {
625                         *(port++) = 0;                  // term proto string and point at port string
626                         proto = proto_port;             // user supplied proto so point at it rather than default
627                 }
628         } else {
629                 port = proto_port;                      // assume something like "1234" was passed
630         }
631
632         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
633                 static_rtc = 1;
634         }
635
636         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
637                 tok = strdup( tok );                                    // something we can destroy
638                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
639                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
640                 } else {
641                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
642                 }
643                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
644                         *tok2 = 0;                                                      // make it so
645                 }
646
647                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
648                 free( tok );
649         } else {
650                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
651                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
652                         free_ctx( ctx );
653                         return NULL;
654                 }
655                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
656                         *tok = 0;                                                                       // we don't keep domain portion
657                 }
658         }
659
660         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
661         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
662                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
663                 free( proto_port );                                     // some scanners complain that port is not freed; it CANNOT be
664                 return NULL;
665         }
666
667         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
668                 if( atoi( tok ) > 0 ) {
669                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
670                 }
671         }
672
673         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
674         if( flags & RMRFL_NAME_ONLY ) {
675                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
676         } else {
677                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
678                 if( ctx->my_ip == NULL ) {
679                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
680                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
681                 }
682         }
683         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
684
685         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
686                 if( *tok == '1' ) {
687                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
688                 }
689         }
690
691
692         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
693                 interface = "0.0.0.0";
694         }
695
696         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
697         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
698                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
699                 free_ctx( ctx );
700                 return NULL;
701         }
702
703                                                                                                 // finish all flag setting before threads to keep helgrind quiet
704         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
705
706
707         // ---------------- setup for route table collector before invoking ----------------------------------
708         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
709         if( ctx->rtgate != NULL ) {
710                 pthread_mutex_init( ctx->rtgate, NULL );
711         }
712
713         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
714         if( ctx->ephash == NULL ) {
715                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
716                 free_ctx( ctx );
717                 return NULL;
718         }
719
720         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
721         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
722                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
723         } else {
724                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
725
726                 if( static_rtc ) {
727                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
728                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
729                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
730                         }
731                 } else {
732                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
733                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
734                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
735                         }
736                 }
737         }
738
739         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
740                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
741         }
742
743         free( proto_port );
744         return (void *) ctx;
745 }
746
747 /*
748         Initialise the message routing environment. Flags are one of the UTAFL_
749         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
750         (tcp) to be used, then :port is all that is needed.
751
752         At the moment it seems that TCP really is the only viable protocol, but
753         we'll allow flexibility.
754
755         The return value is a void pointer which must be passed to most uta functions. On
756         error, a nil pointer is returned and errno should be set.
757
758         Flags:
759                 No user flags supported (needed) at the moment, but this provides for extension
760                 without drastically changing anything. The user should invoke with RMRFL_NONE to
761                 avoid any misbehavour as there are internal flags which are suported
762 */
763 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
764         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
765 }
766
767 /*
768         This sets the default trace length which will be added to any message buffers
769         allocated.  It can be set at any time, and if rmr_set_trace() is given a
770         trace len that is different than the default allcoated in a message, the message
771         will be resized.
772
773         Returns 0 on failure and 1 on success. If failure, then errno will be set.
774 */
775 extern int rmr_init_trace( void* vctx, int tr_len ) {
776         uta_ctx_t* ctx;
777
778         errno = 0;
779         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
780                 errno = EINVAL;
781                 return 0;
782         }
783
784         ctx->trace_data_len = tr_len;
785         return 1;
786 }
787
788 /*
789         Return true if routing table is initialised etc. and app can send/receive.
790 */
791 extern int rmr_ready( void* vctx ) {
792         uta_ctx_t *ctx;
793
794         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
795                 return FALSE;
796         }
797
798         return ctx->rtable_ready;
799 }
800
801 /*
802         This returns the message queue ring's filedescriptor which can be used for
803         calls to epoll.  The user shouild NOT read, write, or close the fd.
804
805         Returns the file descriptor or -1 on error.
806 */
807 extern int rmr_get_rcvfd( void* vctx ) {
808         uta_ctx_t* ctx;
809         int state;
810
811         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
812                 return -1;
813         }
814
815         return uta_ring_getpfd( ctx->mring );
816 }
817
818
819 /*
820         Clean up things.
821
822         There isn't an si_flush() per se, but we can pause, generate
823         a context switch, which should allow the last sent buffer to
824         flow. There isn't exactly an nng_term/close either, so there
825         isn't much we can do.
826 */
827 extern void rmr_close( void* vctx ) {
828         uta_ctx_t *ctx;
829
830         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
831                 return;
832         }
833
834         ctx->shutdown = 1;
835
836         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
837
838         // FIX ME -- how to we turn off si; close all sessions etc?
839         //SIclose( ctx->nn_sock );
840
841 }
842
843
844 // ----- multi-threaded call/receive support -------------------------------------------------
845
846 /*
847         Blocks on the receive ring chute semaphore and then reads from the ring
848         when it is tickled.  If max_wait is -1 then the function blocks until
849         a message is ready on the ring. Else max_wait is assumed to be the number
850         of millaseconds to wait before returning a timeout message.
851 */
852 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
853         uta_ctx_t*      ctx;
854         chute_t*        chute;
855         struct timespec ts;                     // time info if we have a timeout
856         long    new_ms;                         // adjusted mu-sec
857         long    seconds = 0;            // max wait seconds
858         long    nano_sec;                       // max wait xlated to nano seconds
859         int             state;
860         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
861
862         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
863                 errno = EINVAL;
864                 if( mbuf ) {
865                         mbuf->state = RMR_ERR_BADARG;
866                         mbuf->tp_state = errno;
867                 }
868                 return mbuf;
869         }
870
871         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
872
873         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
874
875         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
876                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
877                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
878                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
879                         if( ombuf ) {
880                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
881                         }
882                 } else {
883                         mbuf = ombuf;                                           // return original if it was given with timeout status
884                         if( ombuf != NULL ) {
885                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
886                                 mbuf->len = 0;
887                         }
888                 }
889
890                 if( mbuf != NULL ) {
891                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
892                 }
893
894                 return mbuf;
895         }
896
897         if( ombuf ) {
898                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
899                 ombuf->len = 0;
900         }
901         if( max_wait > 0 ) {
902                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
903
904                 if( max_wait > 999 ) {
905                         seconds = max_wait / 1000;
906                         max_wait -= seconds * 1000;
907                         ts.tv_sec += seconds;
908                 }
909                 if( max_wait > 0 ) {
910                         nano_sec = max_wait * 1000000;
911                         ts.tv_nsec += nano_sec;
912                         if( ts.tv_nsec > 999999999 ) {
913                                 ts.tv_nsec -= 999999999;
914                                 ts.tv_sec++;
915                         }
916                 }
917
918                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
919         }
920
921         errno = EINTR;
922         state = -1;
923         while( state < 0 && errno == EINTR ) {
924                 if( seconds ) {
925                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
926                 } else {
927                         state = sem_wait( &chute->barrier );
928                 }
929         }
930
931         if( state < 0 ) {
932                 mbuf = ombuf;                           // return caller's buffer if they passed one in
933         } else {
934                 errno = 0;                                              // interrupted call state could be left; clear
935                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
936                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
937                         mbuf->state = RMR_OK;
938                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
939
940                         if( ombuf ) {
941                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
942                         }
943                 } else {
944                         errno = ETIMEDOUT;
945                         mbuf = ombuf;                           // no buffer, return user's if there
946                 }
947         }
948
949         if( mbuf ) {
950                 mbuf->tp_state = errno;
951         }
952         return mbuf;
953 }
954
955
956
957
958 /*
959         This is the work horse for the multi-threaded call() function. It supports
960         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
961         for for rmr_mt_call() modulo the caveat below.
962
963         If endpoint is given, then we assume that we're not doing normal route table
964         routing and that we should send directly to that endpoint (probably worm
965         hole).
966 */
967 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
968         rmr_mbuf_t* ombuf;                      // original mbuf passed in
969         uta_ctx_t*      ctx;
970         uta_mhdr_t*     hdr;                    // header in the transport buffer
971         chute_t*        chute;
972         unsigned char*  d1;                     // d1 data in header
973         struct timespec ts;                     // time info if we have a timeout
974         long    new_ms;                         // adjusted mu-sec
975         long    seconds = 0;            // max wait seconds
976         long    nano_sec;                       // max wait xlated to nano seconds
977         int             state;
978
979         errno = EINVAL;
980         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
981                 if( mbuf ) {
982                         mbuf->tp_state = errno;
983                         mbuf->state = RMR_ERR_BADARG;
984                 }
985                 return mbuf;
986         }
987
988         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
989                 mbuf->state = RMR_ERR_NOTSUPP;
990                 mbuf->tp_state = errno;
991                 return mbuf;
992         }
993
994         ombuf = mbuf;                                                                                                   // save to return timeout status with
995
996         chute = &ctx->chutes[call_id];
997         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
998                 rmr_free_msg( chute->mbuf );
999                 chute->mbuf = NULL;
1000         }
1001
1002         hdr = (uta_mhdr_t *) mbuf->header;
1003         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1004         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1005         d1 = DATA1_ADDR( hdr );
1006         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1007         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1008
1009         if( max_wait >= 0 ) {
1010                 clock_gettime( CLOCK_REALTIME, &ts );
1011
1012                 if( max_wait > 999 ) {
1013                         seconds = max_wait / 1000;
1014                         max_wait -= seconds * 1000;
1015                         ts.tv_sec += seconds;
1016                 }
1017                 if( max_wait > 0 ) {
1018                         nano_sec = max_wait * 1000000;
1019                         ts.tv_nsec += nano_sec;
1020                         if( ts.tv_nsec > 999999999 ) {
1021                                 ts.tv_nsec -= 999999999;
1022                                 ts.tv_sec++;
1023                         }
1024                 }
1025
1026                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1027         }
1028
1029         if( ep == NULL ) {                                                                              // normal routing
1030                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1031         } else {
1032                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1033         }
1034         if( mbuf ) {
1035                 if( mbuf->state != RMR_OK ) {
1036                         mbuf->tp_state = errno;
1037                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1038                 }
1039         }
1040
1041         state = 0;
1042         errno = 0;
1043         while( chute->mbuf == NULL && ! errno ) {
1044                 if( seconds ) {
1045                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1046                 } else {
1047                         state = sem_wait( &chute->barrier );
1048                 }
1049
1050                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1051                         errno = 0;
1052                 }
1053
1054                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1055                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1056                                 rmr_free_msg( chute->mbuf );
1057                                 chute->mbuf = NULL;
1058                                 errno = 0;
1059                         }
1060                 }
1061         }
1062
1063         if( state < 0 ) {
1064                 return NULL;                                    // leave errno as set by sem wait call
1065         }
1066
1067         mbuf = chute->mbuf;
1068         if( mbuf != NULL ) {
1069                 mbuf->state = RMR_OK;
1070         }
1071         chute->mbuf = NULL;
1072
1073         return mbuf;
1074 }
1075
1076 /*
1077         Accept a message buffer and caller ID, send the message and then wait
1078         for the receiver to tickle the semaphore letting us know that a message
1079         has been received. The call_id is a value between 2 and 255, inclusive; if
1080         it's not in this range an error will be returned. Max wait is the amount
1081         of time in millaseconds that the call should block for. If 0 is given
1082         then no timeout is set.
1083
1084         If the mt_call feature has not been initialised, then the attempt to use this
1085         funciton will fail with RMR_ERR_NOTSUPP
1086
1087         If no matching message is received before the max_wait period expires, a
1088         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1089         occurs after the message has been sent, then a nil pointer is returned
1090         with errno set to some other value.
1091
1092         This is now just an outward facing wrapper so we can support wormhole calls.
1093 */
1094 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1095
1096         // must vet call_id here, all others vetted by workhorse mt_call() function
1097         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1098                 if( mbuf != NULL ) {
1099                         mbuf->state = RMR_ERR_BADARG;
1100                         mbuf->tp_state = EINVAL;
1101                 }
1102                 return mbuf;
1103         }
1104
1105         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1106 }
1107
1108
1109 /*
1110         Given an existing message buffer, reallocate the payload portion to
1111         be at least new_len bytes.  The message header will remain such that
1112         the caller may use the rmr_rts_msg() function to return a payload
1113         to the sender.
1114
1115         The mbuf passed in may or may not be reallocated and the caller must
1116         use the returned pointer and should NOT assume that it can use the
1117         pointer passed in with the exceptions based on the clone flag.
1118
1119         If the clone flag is set, then a duplicated message, with larger payload
1120         size, is allocated and returned.  The old_msg pointer in this situation is
1121         still valid and must be explicitly freed by the application. If the clone
1122         message is not set (0), then any memory management of the old message is
1123         handled by the function.
1124
1125         If the copy flag is set, the contents of the old message's payload is
1126         copied to the reallocated payload.  If the flag is not set, then the
1127         contents of the payload is undetermined.
1128 */
1129 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1130         if( old_msg == NULL ) {
1131                 return NULL;
1132         }
1133
1134         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1135 }
1136
1137 /*
1138         Enable low latency things in the transport (when supported).
1139 */
1140 extern void rmr_set_low_latency( void* vctx ) {
1141         uta_ctx_t*      ctx;
1142
1143         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1144                 if( ctx->si_ctx != NULL ) {
1145                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1146                 }
1147         }
1148 }
1149
1150 /*
1151         Turn on fast acks.
1152 */
1153 extern void rmr_set_fack( void* vctx ) {
1154         uta_ctx_t*      ctx;
1155
1156         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1157                 if( ctx->si_ctx != NULL ) {
1158                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1159                 }
1160         }
1161 }
1162