Correct potenital locking issue in msg allocation
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "rtc_static.c"                         // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c"                       // send/receive static functions
73 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
76
77
78 //------------------------------------------------------------------------------
79
80
81 /*
82         Clean up a context.
83 */
84 static void free_ctx( uta_ctx_t* ctx ) {
85         if( ctx ) {
86                 if( ctx->rtg_addr ){
87                         free( ctx->rtg_addr );
88                 }
89                 uta_ring_free( ctx->mring );
90                 uta_ring_free( ctx->zcb_mring );
91                 if( ctx->chutes ){
92                         free( ctx->chutes );
93                 }
94                 if( ctx->fd2ep ){
95                         rmr_sym_free( ctx->fd2ep );
96                 }
97                 if( ctx->my_name ){
98                         free( ctx->my_name );
99                 }
100                 if( ctx->my_ip ){
101                         free( ctx->my_ip );
102                 }
103                 if( ctx->rtable ){
104                         rmr_sym_free( ctx->rtable->hash );
105                         free( ctx->rtable );
106                 }
107                 if ( ctx->ephash ){
108                         free( ctx->ephash );
109                 }
110                 free( ctx );
111         }
112 }
113
114 // --------------- public functions --------------------------------------------------------------------------
115
116 /*
117         Returns the size of the payload (bytes) that the msg buffer references.
118         Len in a message is the number of bytes which were received, or should
119         be transmitted, however, it is possible that the mbuf was allocated
120         with a larger payload space than the payload length indicates; this
121         function returns the absolute maximum space that the user has available
122         in the payload. On error (bad msg buffer) -1 is returned and errno should
123         indicate the rason.
124
125         The allocated len stored in the msg is:
126                 transport header length +
127                 message header +
128                 user requested payload
129
130         The msg header is a combination of the fixed RMR header and the variable
131         trace data and d2 fields which may vary for each message.
132 */
133 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
134         if( msg == NULL || msg->header == NULL ) {
135                 errno = EINVAL;
136                 return -1;
137         }
138
139         errno = 0;
140         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
141 }
142
143 /*
144         Allocates a send message as a zerocopy message allowing the underlying message protocol
145         to send the buffer without copy.
146 */
147 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
148         uta_ctx_t*      ctx;
149         rmr_mbuf_t*     m;
150
151         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
152                 return NULL;
153         }
154
155         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
156         return  m;
157 }
158
159
160 /*
161         Allocates a send message as a zerocopy message allowing the underlying message protocol
162         to send the buffer without copy. In addition, a trace data field of tr_size will be
163         added and the supplied data coppied to the buffer before returning the message to
164         the caller.
165 */
166 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
167         uta_ctx_t*      ctx;
168         rmr_mbuf_t*     m;
169         int state;
170
171         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
172                 return NULL;
173         }
174
175         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
176         if( m != NULL ) {
177                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
178                 if( state != tr_size ) {
179                         m->state = RMR_ERR_INITFAILED;
180                 }
181         }
182
183         return  m;
184 }
185
186 /*
187         This provides an external path to the realloc static function as it's called by an
188         outward facing mbuf api function. Used to reallocate a message with a different
189         trace data size.
190 */
191 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
192         return realloc_msg( msg, new_tr_size );
193 }
194
195
196 /*
197         Return the message to the available pool, or free it outright.
198 */
199 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
200         if( mbuf == NULL ) {
201                 return;
202         }
203
204         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
205                 ! mbuf->ring ||                                                                         // cant cache if no ring
206                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
207
208                 if( mbuf->tp_buf ) {
209                         free( mbuf->tp_buf );
210                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
211                 }
212
213                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
214                 free( mbuf );
215         }
216 }
217
218 /*
219         This is a wrapper to the real timeout send. We must wrap it now to ensure that
220         the call flag and call-id are reset
221 */
222 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
223         char* d1;                                                                                                                       // point at the call-id in the header
224
225         if( msg != NULL ) {
226                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
227
228                 d1 = DATA1_ADDR( msg->header );
229                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
230         }
231
232         return mtosend_msg( vctx, msg, max_to );
233 }
234
235 /*
236         Send with default max timeout as is set in the context.
237         See rmr_mtosend_msg() for more details on the parameters.
238         See rmr_stimeout() for info on setting the default timeout.
239 */
240 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
241         char* d1;                                                                                                               // point at the call-id in the header
242
243         if( msg != NULL ) {
244                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
245
246                 d1 = DATA1_ADDR( msg->header );
247                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
248         }
249
250         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
251 }
252
253 /*
254         Return to sender allows a message to be sent back to the endpoint where it originated.
255
256         With SI95 it was thought that the return to sender would be along the same open conneciton
257         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
258         applications sending at high message rates, returning responses on the same connection
259         causes major strife. Thus the decision was made to use the same method as NNG and just
260         open a second connection for reverse path.
261
262         We will attempt to use the name in the received message to look up the endpoint. If
263         that failes, then we will write on the connection that the message arrived on as a
264         falback.
265
266         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
267         and on error it can be passed back to this function to retry the send if desired. On error,
268         errno will liklely have the failure reason set by the nng send processing.  The following
269         are possible values for the state in the message buffer:
270
271         Message states returned:
272                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
273                 RMR_ERR_NOHDR  - message did not have a header
274                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
275                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
276                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
277
278         A nil message as the return value is rare, and generally indicates some kind of horrible
279         failure. The value of errno might give a clue as to what is wrong.
280
281         CAUTION:
282                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
283                 The caller must check for this and handle it properly.
284 */
285 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
286         int                     nn_sock;                        // endpoint socket for send
287         uta_ctx_t*      ctx;
288         char*           hold_src;                       // we need the original source if send fails
289         char*           hold_ip;                        // also must hold original ip
290         int                     sock_ok = 0;            // true if we found a valid endpoint socket
291         endpoint_t*     ep = NULL;                      // end point to track counts
292
293         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
294                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
295                 if( msg != NULL ) {
296                         msg->state = RMR_ERR_BADARG;
297                         msg->tp_state = errno;
298                 }
299                 return msg;
300         }
301
302         errno = 0;                                                                                                              // at this point any bad state is in msg returned
303         if( msg->header == NULL ) {
304                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
305                 msg->state = RMR_ERR_NOHDR;
306                 msg->tp_state = errno;
307                 return msg;
308         }
309
310         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
311
312         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
313         if( ! sock_ok ) {
314                 if( (nn_sock = msg->rts_fd) < 0 ) {
315                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
316                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
317                         }
318                         if( ! sock_ok ) {
319                                 msg->state = RMR_ERR_NOENDPT;
320                                 return msg;
321                         }
322                 }
323         }
324
325
326         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
327         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
328         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
329         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
330         msg = send_msg( ctx, msg, nn_sock, -1 );
331         if( msg ) {
332                 if( ep != NULL ) {
333                         switch( msg->state ) {
334                                 case RMR_OK:
335                                         ep->scounts[EPSC_GOOD]++;
336                                         break;
337
338                                 case RMR_ERR_RETRY:
339                                         ep->scounts[EPSC_TRANS]++;
340                                         break;
341
342                                 default:
343                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
344                                         ep->scounts[EPSC_FAIL]++;
345                                         break;
346                         }
347                 }
348                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
349                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
350                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
351         }
352
353         free( hold_src );
354         free( hold_ip );
355         return msg;
356 }
357
358 /*
359         If multi-threading call is turned on, this invokes that mechanism with the special call
360         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
361         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
362         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
363         a flexible timeout.
364
365         On timeout this function will return a nil pointer. If the original message could not
366         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
367
368         Original behavour:
369         Call sends the message based on message routing using the message type, and waits for a
370         response message to arrive with the same transaction id that was in the outgoing message.
371         If, while wiating for the expected response,  messages are received which do not have the
372         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
373         order that they were received.
374
375         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
376         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
377         may be resent (likely the context pointer was nil).  If the message is sent, but no
378         response is received, a nil message is returned with errno set to indicate the likley
379         issue:
380                 ETIMEDOUT -- too many messages were queued before reciving the expected response
381                 ENOBUFS -- the queued message ring is full, messages were dropped
382                 EINVAL  -- A parameter was not valid
383                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
384                                         user should call this function with the message again.
385
386 */
387 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
388         uta_ctx_t*              ctx;
389
390         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
391                 if( msg != NULL ) {
392                         msg->state = RMR_ERR_BADARG;
393                 }
394                 return msg;
395         }
396
397     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
398 }
399
400 /*
401         The outward facing receive function. When invoked it will pop the oldest message
402         from the receive ring, if any are queued, and return it. If the ring is empty
403         then the receive function is invoked to wait for the next message to arrive (blocking).
404
405         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
406         nil, a new one will be allocated. However, the caller should NOT expect to get the same
407         struct back (if a queued message is returned the message struct will be different).
408 */
409 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
410         uta_ctx_t*      ctx;
411         rmr_mbuf_t*     qm;                             // message that was queued on the ring
412
413         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
414                 errno = EINVAL;
415                 if( old_msg != NULL ) {
416                         old_msg->state = RMR_ERR_BADARG;
417                         old_msg->tp_state = errno;
418                 }
419                 return old_msg;
420         }
421         errno = 0;
422
423         return rmr_mt_rcv( ctx, old_msg, -1 );
424 }
425
426 /*
427         This allows a timeout based receive for applications unable to implement epoll_wait()
428         (e.g. wrappers).
429 */
430 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
431         uta_ctx_t*      ctx;
432
433         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
434                 errno = EINVAL;
435                 if( old_msg != NULL ) {
436                         old_msg->state = RMR_ERR_BADARG;
437                         old_msg->tp_state = errno;
438                 }
439                 return old_msg;
440         }
441
442         return rmr_mt_rcv( ctx, old_msg, ms_to );
443 }
444
445 /*
446         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
447                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
448                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
449
450         This blocks until the message with the 'expect' ID is received. Messages which are received
451         before the expected message are queued onto the message ring.  The function will return
452         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
453         expected message is received. If the queued message ring fills a nil pointer is returned
454         and errno is set to ENOBUFS.
455
456         Generally this will be invoked only by the call() function as it waits for a response, but
457         it is exposed to the user application as three is no reason not to.
458 */
459 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
460         uta_ctx_t*      ctx;
461         int     queued = 0;                             // number we pushed into the ring
462         int     exp_len = 0;                    // length of expected ID
463
464         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
465                 errno = EINVAL;
466                 if( msg != NULL ) {
467                         msg->state = RMR_ERR_BADARG;
468                         msg->tp_state = errno;
469                 }
470                 return msg;
471         }
472
473         errno = 0;
474
475         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
476                 return rmr_rcv_msg( ctx, msg );
477         }
478
479         exp_len = strlen( expect );
480         if( exp_len > RMR_MAX_XID ) {
481                 exp_len = RMR_MAX_XID;
482         }
483         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
484
485         while( queued < allow2queue ) {
486                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
487                 if( msg != NULL ) {
488                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
489                         if( msg->state == RMR_OK ) {
490                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
491                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
492                                         return msg;
493                                 }
494
495                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
496                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
497                                         errno = ENOBUFS;
498                                         return NULL;
499                                 }
500
501                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
502                                 queued++;
503                                 msg = NULL;
504                         }
505                 }
506         }
507
508         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
509         errno = ETIMEDOUT;
510         return NULL;
511 }
512
513 /*
514         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
515         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
516         mechnism indicates eagain or etimeedout.  All other error conditions are reported
517         without this delay. Setting a timeout of 0 causes no retries to be attempted in
518         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
519         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
520         after every 1K send attempts until the "time" value is reached. Retries are abandoned
521         if NNG returns anything other than EAGAIN or EINTER is returned.
522
523         The default, if this function is not used, is 1; meaning that RMr will retry, but will
524         not enter a sleep.  In all cases the caller should check the status in the message returned
525         after a send call.
526
527         Returns -1 if the context was invalid; RMR_OK otherwise.
528 */
529 extern int rmr_set_stimeout( void* vctx, int time ) {
530         uta_ctx_t*      ctx;
531
532         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
533                 return -1;
534         }
535
536         if( time < 0 ) {
537                 time = 0;
538         }
539
540         ctx->send_retries = time;
541         return RMR_OK;
542 }
543
544 /*
545         Set receive timeout -- not supported in nng implementation
546
547         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
548 */
549 extern int rmr_set_rtimeout( void* vctx, int time ) {
550         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
551         return 0;
552 }
553
554
555 /*
556         This is the actual init workhorse. The user visible function meerly ensures that the
557         calling programme does NOT set any internal flags that are supported, and then
558         invokes this.  Internal functions (the route table collector) which need additional
559         open ports without starting additional route table collectors, will invoke this
560         directly with the proper flag.
561
562         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
563                                 that we know about. The _user_ should ensure that the supplied length also
564                                 includes the trace data length maximum as they are in control of that.
565 */
566 static void* init(  char* uproto_port, int def_msg_size, int flags ) {
567         static  int announced = 0;
568         uta_ctx_t*      ctx = NULL;
569         char    bind_info[256];                         // bind info
570         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
571         char*   port;                                           // pointer into the proto_port buffer at the port value
572         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
573         char*   proto_port;
574         char    wbuf[1024];                                     // work buffer
575         char*   tok;                                            // pointer at token in a buffer
576         char*   tok2;
577         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
578         int             state;
579         int             i;
580         int             old_vlevel;
581
582         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
583
584         if( ! announced ) {
585                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
586                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
587                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
588                 announced = 1;
589
590                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
591                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
592         }
593
594         errno = 0;
595         if( uproto_port == NULL ) {
596                 proto_port = strdup( DEF_COMM_PORT );
597         } else {
598                 proto_port = strdup( uproto_port );             // so we can modify it
599         }
600
601         if ( proto_port == NULL ){
602                 errno = ENOMEM;
603                 return NULL;
604         }
605
606         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
607                 errno = ENOMEM;
608                 goto err;
609         }
610         memset( ctx, 0, sizeof( uta_ctx_t ) );
611
612         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
613         ctx->nrivers = 256;                                                             // number of input flows we'll manage
614         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
615         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
616         for( i = 0; i < ctx->nrivers; i++ ) {
617                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
618         }
619
620         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
621         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
622         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
623         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
624
625         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
626         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
627
628         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
629                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
630                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
631                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
632         } else {
633                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
634         }
635         init_mtcall( ctx );                                                             // set up call chutes
636         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
637
638
639         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
640         if( def_msg_size > 0 ) {
641                 ctx->max_plen = def_msg_size;
642         }
643
644         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
645         if( ctx->si_ctx == NULL ) {
646                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
647                 goto err;
648         }
649
650         if( (port = strchr( proto_port, ':' )) != NULL ) {
651                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
652                         port++;
653                 } else {
654                         *(port++) = 0;                  // term proto string and point at port string
655                         proto = proto_port;             // user supplied proto so point at it rather than default
656                 }
657         } else {
658                 port = proto_port;                      // assume something like "1234" was passed
659         }
660
661         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
662                 static_rtc = 1;
663         }
664
665         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
666                 tok = strdup( tok );                                    // something we can destroy
667                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
668                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
669                 } else {
670                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
671                 }
672                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
673                         *tok2 = 0;                                                      // make it so
674                 }
675
676                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
677                 free( tok );
678         } else {
679                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
680                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
681                         goto err;
682                 }
683                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
684                         *tok = 0;                                                                       // we don't keep domain portion
685                 }
686         }
687
688         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
689         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
690                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
691                 errno = EINVAL;
692                 goto err;
693         }
694
695         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
696                 if( atoi( tok ) > 0 ) {
697                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
698                 }
699         }
700
701         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
702         if( flags & RMRFL_NAME_ONLY ) {
703                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
704         } else {
705                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
706                 if( ctx->my_ip == NULL ) {
707                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
708                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
709                 }
710         }
711         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
712
713         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
714                 if( *tok == '1' ) {
715                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
716                 }
717         }
718
719
720         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
721                 interface = "0.0.0.0";
722         }
723
724         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
725         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
726                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
727                 goto err;
728         }
729
730                                                                                                 // finish all flag setting before threads to keep helgrind quiet
731         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
732
733
734         // ---------------- setup for route table collector before invoking ----------------------------------
735         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
736         if( ctx->rtgate != NULL ) {
737                 pthread_mutex_init( ctx->rtgate, NULL );
738         }
739
740         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
741         if( ctx->ephash == NULL ) {
742                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
743                 errno = ENOMEM;
744                 goto err;
745         }
746
747         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
748         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
749                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
750         } else {
751                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
752
753                 if( static_rtc ) {
754                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
755                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
756                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
757                         }
758                 } else {
759                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
760                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
761                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
762                         }
763                 }
764         }
765
766         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
767                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
768         }
769
770         free( proto_port );
771         return (void *) ctx;
772
773 err:
774         free( proto_port );
775         free_ctx( ctx );
776         return NULL;
777 }
778
779 /*
780         Initialise the message routing environment. Flags are one of the UTAFL_
781         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
782         (tcp) to be used, then :port is all that is needed.
783
784         At the moment it seems that TCP really is the only viable protocol, but
785         we'll allow flexibility.
786
787         The return value is a void pointer which must be passed to most uta functions. On
788         error, a nil pointer is returned and errno should be set.
789
790         Flags:
791                 No user flags supported (needed) at the moment, but this provides for extension
792                 without drastically changing anything. The user should invoke with RMRFL_NONE to
793                 avoid any misbehavour as there are internal flags which are suported
794 */
795 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
796         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
797 }
798
799 /*
800         This sets the default trace length which will be added to any message buffers
801         allocated.  It can be set at any time, and if rmr_set_trace() is given a
802         trace len that is different than the default allcoated in a message, the message
803         will be resized.
804
805         Returns 0 on failure and 1 on success. If failure, then errno will be set.
806 */
807 extern int rmr_init_trace( void* vctx, int tr_len ) {
808         uta_ctx_t* ctx;
809
810         errno = 0;
811         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
812                 errno = EINVAL;
813                 return 0;
814         }
815
816         ctx->trace_data_len = tr_len;
817         return 1;
818 }
819
820 /*
821         Return true if routing table is initialised etc. and app can send/receive.
822 */
823 extern int rmr_ready( void* vctx ) {
824         uta_ctx_t *ctx;
825
826         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
827                 return FALSE;
828         }
829
830         return ctx->rtable_ready;
831 }
832
833 /*
834         This returns the message queue ring's filedescriptor which can be used for
835         calls to epoll.  The user shouild NOT read, write, or close the fd.
836
837         Returns the file descriptor or -1 on error.
838 */
839 extern int rmr_get_rcvfd( void* vctx ) {
840         uta_ctx_t* ctx;
841         int state;
842
843         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
844                 return -1;
845         }
846
847         return uta_ring_getpfd( ctx->mring );
848 }
849
850
851 /*
852         Clean up things.
853
854         There isn't an si_flush() per se, but we can pause, generate
855         a context switch, which should allow the last sent buffer to
856         flow. There isn't exactly an nng_term/close either, so there
857         isn't much we can do.
858 */
859 extern void rmr_close( void* vctx ) {
860         uta_ctx_t *ctx;
861
862         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
863                 return;
864         }
865
866         ctx->shutdown = 1;
867
868         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
869
870         // FIX ME -- how to we turn off si; close all sessions etc?
871         //SIclose( ctx->nn_sock );
872
873 }
874
875
876 // ----- multi-threaded call/receive support -------------------------------------------------
877
878 /*
879         Blocks on the receive ring chute semaphore and then reads from the ring
880         when it is tickled.  If max_wait is -1 then the function blocks until
881         a message is ready on the ring. Else max_wait is assumed to be the number
882         of millaseconds to wait before returning a timeout message.
883 */
884 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
885         uta_ctx_t*      ctx;
886         chute_t*        chute;
887         struct timespec ts;                     // time info if we have a timeout
888         long    new_ms;                         // adjusted mu-sec
889         long    seconds = 0;            // max wait seconds
890         long    nano_sec;                       // max wait xlated to nano seconds
891         int             state;
892         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
893
894         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
895                 errno = EINVAL;
896                 if( mbuf ) {
897                         mbuf->state = RMR_ERR_BADARG;
898                         mbuf->tp_state = errno;
899                 }
900                 return mbuf;
901         }
902
903         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
904
905         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
906
907         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
908                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
909                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
910                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
911                         if( ombuf ) {
912                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
913                         }
914                 } else {
915                         mbuf = ombuf;                                           // return original if it was given with timeout status
916                         if( ombuf != NULL ) {
917                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
918                                 mbuf->len = 0;
919                         }
920                 }
921
922                 if( mbuf != NULL ) {
923                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
924                 }
925
926                 return mbuf;
927         }
928
929         if( ombuf ) {
930                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
931                 ombuf->len = 0;
932         }
933         if( max_wait > 0 ) {
934                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
935
936                 if( max_wait > 999 ) {
937                         seconds = max_wait / 1000;
938                         max_wait -= seconds * 1000;
939                         ts.tv_sec += seconds;
940                 }
941                 if( max_wait > 0 ) {
942                         nano_sec = max_wait * 1000000;
943                         ts.tv_nsec += nano_sec;
944                         if( ts.tv_nsec > 999999999 ) {
945                                 ts.tv_nsec -= 999999999;
946                                 ts.tv_sec++;
947                         }
948                 }
949
950                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
951         }
952
953         errno = EINTR;
954         state = -1;
955         while( state < 0 && errno == EINTR ) {
956                 if( seconds ) {
957                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
958                 } else {
959                         state = sem_wait( &chute->barrier );
960                 }
961         }
962
963         if( state < 0 ) {
964                 mbuf = ombuf;                           // return caller's buffer if they passed one in
965         } else {
966                 errno = 0;                                              // interrupted call state could be left; clear
967                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
968                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
969                         mbuf->state = RMR_OK;
970                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
971
972                         if( ombuf ) {
973                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
974                         }
975                 } else {
976                         errno = ETIMEDOUT;
977                         mbuf = ombuf;                           // no buffer, return user's if there
978                 }
979         }
980
981         if( mbuf ) {
982                 mbuf->tp_state = errno;
983         }
984         return mbuf;
985 }
986
987
988
989
990 /*
991         This is the work horse for the multi-threaded call() function. It supports
992         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
993         for for rmr_mt_call() modulo the caveat below.
994
995         If endpoint is given, then we assume that we're not doing normal route table
996         routing and that we should send directly to that endpoint (probably worm
997         hole).
998 */
999 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1000         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1001         uta_ctx_t*      ctx;
1002         uta_mhdr_t*     hdr;                    // header in the transport buffer
1003         chute_t*        chute;
1004         unsigned char*  d1;                     // d1 data in header
1005         struct timespec ts;                     // time info if we have a timeout
1006         long    new_ms;                         // adjusted mu-sec
1007         long    seconds = 0;            // max wait seconds
1008         long    nano_sec;                       // max wait xlated to nano seconds
1009         int             state;
1010
1011         errno = EINVAL;
1012         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1013                 if( mbuf ) {
1014                         mbuf->tp_state = errno;
1015                         mbuf->state = RMR_ERR_BADARG;
1016                 }
1017                 return mbuf;
1018         }
1019
1020         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1021                 mbuf->state = RMR_ERR_NOTSUPP;
1022                 mbuf->tp_state = errno;
1023                 return mbuf;
1024         }
1025
1026         ombuf = mbuf;                                                                                                   // save to return timeout status with
1027
1028         chute = &ctx->chutes[call_id];
1029         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1030                 rmr_free_msg( chute->mbuf );
1031                 chute->mbuf = NULL;
1032         }
1033
1034         hdr = (uta_mhdr_t *) mbuf->header;
1035         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1036         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1037         d1 = DATA1_ADDR( hdr );
1038         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1039         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1040
1041         if( max_wait >= 0 ) {
1042                 clock_gettime( CLOCK_REALTIME, &ts );
1043
1044                 if( max_wait > 999 ) {
1045                         seconds = max_wait / 1000;
1046                         max_wait -= seconds * 1000;
1047                         ts.tv_sec += seconds;
1048                 }
1049                 if( max_wait > 0 ) {
1050                         nano_sec = max_wait * 1000000;
1051                         ts.tv_nsec += nano_sec;
1052                         if( ts.tv_nsec > 999999999 ) {
1053                                 ts.tv_nsec -= 999999999;
1054                                 ts.tv_sec++;
1055                         }
1056                 }
1057
1058                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1059         }
1060
1061         if( ep == NULL ) {                                                                              // normal routing
1062                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1063         } else {
1064                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1065         }
1066         if( mbuf ) {
1067                 if( mbuf->state != RMR_OK ) {
1068                         mbuf->tp_state = errno;
1069                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1070                 }
1071         }
1072
1073         state = 0;
1074         errno = 0;
1075         while( chute->mbuf == NULL && ! errno ) {
1076                 if( seconds ) {
1077                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1078                 } else {
1079                         state = sem_wait( &chute->barrier );
1080                 }
1081
1082                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1083                         errno = 0;
1084                 }
1085
1086                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1087                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1088                                 rmr_free_msg( chute->mbuf );
1089                                 chute->mbuf = NULL;
1090                                 errno = 0;
1091                         }
1092                 }
1093         }
1094
1095         if( state < 0 ) {
1096                 return NULL;                                    // leave errno as set by sem wait call
1097         }
1098
1099         mbuf = chute->mbuf;
1100         if( mbuf != NULL ) {
1101                 mbuf->state = RMR_OK;
1102         }
1103         chute->mbuf = NULL;
1104
1105         return mbuf;
1106 }
1107
1108 /*
1109         Accept a message buffer and caller ID, send the message and then wait
1110         for the receiver to tickle the semaphore letting us know that a message
1111         has been received. The call_id is a value between 2 and 255, inclusive; if
1112         it's not in this range an error will be returned. Max wait is the amount
1113         of time in millaseconds that the call should block for. If 0 is given
1114         then no timeout is set.
1115
1116         If the mt_call feature has not been initialised, then the attempt to use this
1117         funciton will fail with RMR_ERR_NOTSUPP
1118
1119         If no matching message is received before the max_wait period expires, a
1120         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1121         occurs after the message has been sent, then a nil pointer is returned
1122         with errno set to some other value.
1123
1124         This is now just an outward facing wrapper so we can support wormhole calls.
1125 */
1126 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1127
1128         // must vet call_id here, all others vetted by workhorse mt_call() function
1129         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1130                 if( mbuf != NULL ) {
1131                         mbuf->state = RMR_ERR_BADARG;
1132                         mbuf->tp_state = EINVAL;
1133                 }
1134                 return mbuf;
1135         }
1136
1137         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1138 }
1139
1140
1141 /*
1142         Given an existing message buffer, reallocate the payload portion to
1143         be at least new_len bytes.  The message header will remain such that
1144         the caller may use the rmr_rts_msg() function to return a payload
1145         to the sender.
1146
1147         The mbuf passed in may or may not be reallocated and the caller must
1148         use the returned pointer and should NOT assume that it can use the
1149         pointer passed in with the exceptions based on the clone flag.
1150
1151         If the clone flag is set, then a duplicated message, with larger payload
1152         size, is allocated and returned.  The old_msg pointer in this situation is
1153         still valid and must be explicitly freed by the application. If the clone
1154         message is not set (0), then any memory management of the old message is
1155         handled by the function.
1156
1157         If the copy flag is set, the contents of the old message's payload is
1158         copied to the reallocated payload.  If the flag is not set, then the
1159         contents of the payload is undetermined.
1160 */
1161 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1162         if( old_msg == NULL ) {
1163                 return NULL;
1164         }
1165
1166         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1167 }
1168
1169 /*
1170         Enable low latency things in the transport (when supported).
1171 */
1172 extern void rmr_set_low_latency( void* vctx ) {
1173         uta_ctx_t*      ctx;
1174
1175         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1176                 if( ctx->si_ctx != NULL ) {
1177                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1178                 }
1179         }
1180 }
1181
1182 /*
1183         Turn on fast acks.
1184 */
1185 extern void rmr_set_fack( void* vctx ) {
1186         uta_ctx_t*      ctx;
1187
1188         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1189                 if( ctx->si_ctx != NULL ) {
1190                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1191                 }
1192         }
1193 }
1194