Ensure RT incremental update not applied early
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "alarm.c"
71 #include "rtc_static.c"                         // route table collector (thread code)
72 #include "tools_static.c"
73 #include "sr_si_static.c"                       // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_si_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ){
88                         free( ctx->rtg_addr );
89                 }
90                 uta_ring_free( ctx->mring );
91                 uta_ring_free( ctx->zcb_mring );
92                 if( ctx->chutes ){
93                         free( ctx->chutes );
94                 }
95                 if( ctx->fd2ep ){
96                         rmr_sym_free( ctx->fd2ep );
97                 }
98                 if( ctx->my_name ){
99                         free( ctx->my_name );
100                 }
101                 if( ctx->my_ip ){
102                         free( ctx->my_ip );
103                 }
104                 if( ctx->rtable ){
105                         rmr_sym_free( ctx->rtable->hash );
106                         free( ctx->rtable );
107                 }
108                 if ( ctx->ephash ){
109                         free( ctx->ephash );
110                 }
111                 free( ctx );
112         }
113 }
114
115 // --------------- public functions --------------------------------------------------------------------------
116
117 /*
118         Returns the size of the payload (bytes) that the msg buffer references.
119         Len in a message is the number of bytes which were received, or should
120         be transmitted, however, it is possible that the mbuf was allocated
121         with a larger payload space than the payload length indicates; this
122         function returns the absolute maximum space that the user has available
123         in the payload. On error (bad msg buffer) -1 is returned and errno should
124         indicate the rason.
125
126         The allocated len stored in the msg is:
127                 transport header length +
128                 message header +
129                 user requested payload
130
131         The msg header is a combination of the fixed RMR header and the variable
132         trace data and d2 fields which may vary for each message.
133 */
134 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
135         if( msg == NULL || msg->header == NULL ) {
136                 errno = EINVAL;
137                 return -1;
138         }
139
140         errno = 0;
141         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
142 }
143
144 /*
145         Allocates a send message as a zerocopy message allowing the underlying message protocol
146         to send the buffer without copy.
147 */
148 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
149         uta_ctx_t*      ctx;
150         rmr_mbuf_t*     m;
151
152         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
153                 return NULL;
154         }
155
156         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
157         return  m;
158 }
159
160
161 /*
162         Allocates a send message as a zerocopy message allowing the underlying message protocol
163         to send the buffer without copy. In addition, a trace data field of tr_size will be
164         added and the supplied data coppied to the buffer before returning the message to
165         the caller.
166 */
167 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
168         uta_ctx_t*      ctx;
169         rmr_mbuf_t*     m;
170         int state;
171
172         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
173                 return NULL;
174         }
175
176         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
177         if( m != NULL ) {
178                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
179                 if( state != tr_size ) {
180                         m->state = RMR_ERR_INITFAILED;
181                 }
182         }
183
184         return  m;
185 }
186
187 /*
188         This provides an external path to the realloc static function as it's called by an
189         outward facing mbuf api function. Used to reallocate a message with a different
190         trace data size.
191
192         User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
193         is still valid following this call. The caller is reponsible for maintainting
194         a pointer to both old and new messages and invoking rmr_free_msg() on both!
195 */
196 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
197         return realloc_msg( msg, new_tr_size );
198 }
199
200
201 /*
202         Return the message to the available pool, or free it outright.
203 */
204 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
205         if( mbuf == NULL ) {
206                 fprintf( stderr, ">>>FREE  nil buffer\n" );
207                 return;
208         }
209
210 #ifdef KEEP
211         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
212                 ! mbuf->ring ||                                                                         // cant cache if no ring
213                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
214
215                 if( mbuf->tp_buf ) {
216                         free( mbuf->tp_buf );
217                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
218                 }
219
220                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
221                 free( mbuf );
222         }
223 #else
224         // always free, never manage a pool
225         if( mbuf->tp_buf ) {
226                 free( mbuf->tp_buf );
227                 mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
228         }
229
230         mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
231         free( mbuf );
232 #endif
233 }
234
235 /*
236         This is a wrapper to the real timeout send. We must wrap it now to ensure that
237         the call flag and call-id are reset
238 */
239 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
240         char* d1;                                                                                                                       // point at the call-id in the header
241
242         if( msg != NULL ) {
243                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
244
245                 d1 = DATA1_ADDR( msg->header );
246                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
247         }
248
249         return mtosend_msg( vctx, msg, max_to );
250 }
251
252 /*
253         Send with default max timeout as is set in the context.
254         See rmr_mtosend_msg() for more details on the parameters.
255         See rmr_stimeout() for info on setting the default timeout.
256 */
257 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
258         char* d1;                                                                                                               // point at the call-id in the header
259
260         if( msg != NULL ) {
261                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
262
263                 d1 = DATA1_ADDR( msg->header );
264                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
265         }
266
267         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
268 }
269
270 /*
271         Return to sender allows a message to be sent back to the endpoint where it originated.
272
273         With SI95 it was thought that the return to sender would be along the same open conneciton
274         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
275         applications sending at high message rates, returning responses on the same connection
276         causes major strife. Thus the decision was made to use the same method as NNG and just
277         open a second connection for reverse path.
278
279         We will attempt to use the name in the received message to look up the endpoint. If
280         that failes, then we will write on the connection that the message arrived on as a
281         falback.
282
283         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
284         and on error it can be passed back to this function to retry the send if desired. On error,
285         errno will liklely have the failure reason set by the nng send processing.  The following
286         are possible values for the state in the message buffer:
287
288         Message states returned:
289                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
290                 RMR_ERR_NOHDR  - message did not have a header
291                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
292                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
293                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
294
295         A nil message as the return value is rare, and generally indicates some kind of horrible
296         failure. The value of errno might give a clue as to what is wrong.
297
298         CAUTION:
299                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
300                 The caller must check for this and handle it properly.
301 */
302 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
303         int                     nn_sock;                        // endpoint socket for send
304         uta_ctx_t*      ctx;
305         char*           hold_src;                       // we need the original source if send fails
306         char*           hold_ip;                        // also must hold original ip
307         int                     sock_ok = 0;            // true if we found a valid endpoint socket
308         endpoint_t*     ep = NULL;                      // end point to track counts
309
310         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
311                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
312                 if( msg != NULL ) {
313                         msg->state = RMR_ERR_BADARG;
314                         msg->tp_state = errno;
315                 }
316                 return msg;
317         }
318
319         errno = 0;                                                                                                              // at this point any bad state is in msg returned
320         if( msg->header == NULL ) {
321                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
322                 msg->state = RMR_ERR_NOHDR;
323                 msg->tp_state = errno;
324                 return msg;
325         }
326
327         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
328
329         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
330         if( ! sock_ok ) {
331                 if( (nn_sock = msg->rts_fd) < 0 ) {
332                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
333                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
334                         }
335                         if( ! sock_ok ) {
336                                 msg->state = RMR_ERR_NOENDPT;
337                                 return msg;
338                         }
339                 }
340         }
341
342
343         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
344         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
345         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
346         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
347         msg = send_msg( ctx, msg, nn_sock, -1 );
348         if( msg ) {
349                 if( ep != NULL ) {
350                         switch( msg->state ) {
351                                 case RMR_OK:
352                                         ep->scounts[EPSC_GOOD]++;
353                                         break;
354
355                                 case RMR_ERR_RETRY:
356                                         ep->scounts[EPSC_TRANS]++;
357                                         break;
358
359                                 default:
360                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
361                                         ep->scounts[EPSC_FAIL]++;
362                                         break;
363                         }
364                 }
365                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
366                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
367                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
368         }
369
370         free( hold_src );
371         free( hold_ip );
372         return msg;
373 }
374
375 /*
376         If multi-threading call is turned on, this invokes that mechanism with the special call
377         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
378         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
379         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
380         a flexible timeout.
381
382         On timeout this function will return a nil pointer. If the original message could not
383         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
384
385         Original behavour:
386         Call sends the message based on message routing using the message type, and waits for a
387         response message to arrive with the same transaction id that was in the outgoing message.
388         If, while wiating for the expected response,  messages are received which do not have the
389         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
390         order that they were received.
391
392         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
393         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
394         may be resent (likely the context pointer was nil).  If the message is sent, but no
395         response is received, a nil message is returned with errno set to indicate the likley
396         issue:
397                 ETIMEDOUT -- too many messages were queued before reciving the expected response
398                 ENOBUFS -- the queued message ring is full, messages were dropped
399                 EINVAL  -- A parameter was not valid
400                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
401                                         user should call this function with the message again.
402
403 */
404 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
405         uta_ctx_t*              ctx;
406
407         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
408                 if( msg != NULL ) {
409                         msg->state = RMR_ERR_BADARG;
410                 }
411                 return msg;
412         }
413
414     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
415 }
416
417 /*
418         The outward facing receive function. When invoked it will pop the oldest message
419         from the receive ring, if any are queued, and return it. If the ring is empty
420         then the receive function is invoked to wait for the next message to arrive (blocking).
421
422         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
423         nil, a new one will be allocated. However, the caller should NOT expect to get the same
424         struct back (if a queued message is returned the message struct will be different).
425 */
426 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
427         uta_ctx_t*      ctx;
428         rmr_mbuf_t*     qm;                             // message that was queued on the ring
429
430         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
431                 errno = EINVAL;
432                 if( old_msg != NULL ) {
433                         old_msg->state = RMR_ERR_BADARG;
434                         old_msg->tp_state = errno;
435                 }
436                 return old_msg;
437         }
438         errno = 0;
439
440         return rmr_mt_rcv( ctx, old_msg, -1 );
441 }
442
443 /*
444         This allows a timeout based receive for applications unable to implement epoll_wait()
445         (e.g. wrappers).
446 */
447 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
448         uta_ctx_t*      ctx;
449
450         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
451                 errno = EINVAL;
452                 if( old_msg != NULL ) {
453                         old_msg->state = RMR_ERR_BADARG;
454                         old_msg->tp_state = errno;
455                 }
456                 return old_msg;
457         }
458
459         return rmr_mt_rcv( ctx, old_msg, ms_to );
460 }
461
462 /*
463         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
464                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
465                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
466
467         This blocks until the message with the 'expect' ID is received. Messages which are received
468         before the expected message are queued onto the message ring.  The function will return
469         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
470         expected message is received. If the queued message ring fills a nil pointer is returned
471         and errno is set to ENOBUFS.
472
473         Generally this will be invoked only by the call() function as it waits for a response, but
474         it is exposed to the user application as three is no reason not to.
475 */
476 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
477         uta_ctx_t*      ctx;
478         int     queued = 0;                             // number we pushed into the ring
479         int     exp_len = 0;                    // length of expected ID
480
481         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
482                 errno = EINVAL;
483                 if( msg != NULL ) {
484                         msg->state = RMR_ERR_BADARG;
485                         msg->tp_state = errno;
486                 }
487                 return msg;
488         }
489
490         errno = 0;
491
492         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
493                 return rmr_rcv_msg( ctx, msg );
494         }
495
496         exp_len = strlen( expect );
497         if( exp_len > RMR_MAX_XID ) {
498                 exp_len = RMR_MAX_XID;
499         }
500         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
501
502         while( queued < allow2queue ) {
503                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
504                 if( msg != NULL ) {
505                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
506                         if( msg->state == RMR_OK ) {
507                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
508                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
509                                         return msg;
510                                 }
511
512                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
513                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
514                                         errno = ENOBUFS;
515                                         return NULL;
516                                 }
517
518                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
519                                 queued++;
520                                 msg = NULL;
521                         }
522                 }
523         }
524
525         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
526         errno = ETIMEDOUT;
527         return NULL;
528 }
529
530 /*
531         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
532         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
533         mechnism indicates eagain or etimeedout.  All other error conditions are reported
534         without this delay. Setting a timeout of 0 causes no retries to be attempted in
535         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
536         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
537         after every 1K send attempts until the "time" value is reached. Retries are abandoned
538         if NNG returns anything other than EAGAIN or EINTER is returned.
539
540         The default, if this function is not used, is 1; meaning that RMr will retry, but will
541         not enter a sleep.  In all cases the caller should check the status in the message returned
542         after a send call.
543
544         Returns -1 if the context was invalid; RMR_OK otherwise.
545 */
546 extern int rmr_set_stimeout( void* vctx, int time ) {
547         uta_ctx_t*      ctx;
548
549         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
550                 return -1;
551         }
552
553         if( time < 0 ) {
554                 time = 0;
555         }
556
557         ctx->send_retries = time;
558         return RMR_OK;
559 }
560
561 /*
562         Set receive timeout -- not supported in nng implementation
563
564         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
565 */
566 extern int rmr_set_rtimeout( void* vctx, int time ) {
567         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
568         return 0;
569 }
570
571
572 /*
573         This is the actual init workhorse. The user visible function meerly ensures that the
574         calling programme does NOT set any internal flags that are supported, and then
575         invokes this.  Internal functions (the route table collector) which need additional
576         open ports without starting additional route table collectors, will invoke this
577         directly with the proper flag.
578
579         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
580                                 that we know about. The _user_ should ensure that the supplied length also
581                                 includes the trace data length maximum as they are in control of that.
582 */
583 static void* init( char* uproto_port, int def_msg_size, int flags ) {
584         static  int announced = 0;
585         uta_ctx_t*      ctx = NULL;
586         char    bind_info[256];                         // bind info
587         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
588         char*   port;                                           // pointer into the proto_port buffer at the port value
589         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
590         char*   proto_port;
591         char    wbuf[1024];                                     // work buffer
592         char*   tok;                                            // pointer at token in a buffer
593         char*   tok2;
594         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
595         int             state;
596         int             i;
597         int             old_vlevel;
598
599         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
600
601         if( ! announced ) {
602                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
603                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
604                         uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
605                 announced = 1;
606
607                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
608                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
609         }
610
611         errno = 0;
612         if( uproto_port == NULL ) {
613                 proto_port = strdup( DEF_COMM_PORT );
614                 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
615         } else {
616                 proto_port = strdup( uproto_port );             // so we can modify it
617         }
618
619         if ( proto_port == NULL ){
620                 errno = ENOMEM;
621                 return NULL;
622         }
623
624         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
625                 errno = ENOMEM;
626                 goto err;
627         }
628         memset( ctx, 0, sizeof( uta_ctx_t ) );
629
630         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
631         ctx->snarf_rt_fd = -1;
632         ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
633         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
634         ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
635         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
636         for( i = 0; i < ctx->nrivers; i++ ) {
637                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
638         }
639
640         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
641         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
642         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
643         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
644
645         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
646         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
647
648         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
649                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
650                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
651                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
652         } else {
653                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
654         }
655         init_mtcall( ctx );                                                             // set up call chutes
656         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
657
658
659         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
660         if( def_msg_size > 0 ) {
661                 ctx->max_plen = def_msg_size;
662         }
663
664         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
665         if( ctx->si_ctx == NULL ) {
666                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
667                 goto err;
668         }
669
670         if( (port = strchr( proto_port, ':' )) != NULL ) {
671                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
672                         port++;
673                 } else {
674                         *(port++) = 0;                  // term proto string and point at port string
675                         proto = proto_port;             // user supplied proto so point at it rather than default
676                 }
677         } else {
678                 port = proto_port;                      // assume something like "1234" was passed
679         }
680         rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
681
682         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
683                 static_rtc = 1;
684         }
685
686         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
687                 tok = strdup( tok );                                    // something we can destroy
688                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
689                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
690                 } else {
691                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
692                 }
693                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
694                         *tok2 = 0;                                                      // make it so
695                 }
696
697                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
698                 free( tok );
699         } else {
700                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
701                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
702                         goto err;
703                 }
704                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
705                         *tok = 0;                                                                       // we don't keep domain portion
706                 }
707         }
708
709         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
710         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
711                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
712                 errno = EINVAL;
713                 goto err;
714         }
715
716         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
717                 if( atoi( tok ) > 0 ) {
718                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
719                 }
720         }
721
722         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
723         if( flags & RMRFL_NAME_ONLY ) {
724                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
725         } else {
726                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
727                 if( ctx->my_ip == NULL ) {
728                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
729                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
730                 }
731         }
732         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
733
734         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
735                 if( *tok == '1' ) {
736                         ctx->flags |= CFL_WARN;                                 // turn on some warnings (not all, just ones that shouldn't impact performance)
737                 }
738         }
739
740
741         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
742                 interface = "0.0.0.0";
743         }
744
745         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
746         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
747                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
748                 goto err;
749         }
750
751                                                                                                 // finish all flag setting before threads to keep helgrind quiet
752         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
753
754
755         // ---------------- setup for route table collector before invoking ----------------------------------
756         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
757         if( ctx->rtgate != NULL ) {
758                 pthread_mutex_init( ctx->rtgate, NULL );
759         }
760
761         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
762         if( ctx->ephash == NULL ) {
763                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
764                 errno = ENOMEM;
765                 goto err;
766         }
767
768         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
769         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
770                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
771         } else {
772                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
773
774                 if( static_rtc ) {
775                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
776                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
777                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
778                         }
779                 } else {
780                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
781                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
782                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
783                         }
784                 }
785         }
786
787         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
788                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
789         }
790
791         free( proto_port );
792         return (void *) ctx;
793
794 err:
795         free( proto_port );
796         free_ctx( ctx );
797         return NULL;
798 }
799
800 /*
801         Initialise the message routing environment. Flags are one of the UTAFL_
802         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
803         (tcp) to be used, then :port is all that is needed.
804
805         At the moment it seems that TCP really is the only viable protocol, but
806         we'll allow flexibility.
807
808         The return value is a void pointer which must be passed to most uta functions. On
809         error, a nil pointer is returned and errno should be set.
810
811         Flags:
812                 No user flags supported (needed) at the moment, but this provides for extension
813                 without drastically changing anything. The user should invoke with RMRFL_NONE to
814                 avoid any misbehavour as there are internal flags which are suported
815 */
816 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
817         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
818 }
819
820 /*
821         This sets the default trace length which will be added to any message buffers
822         allocated.  It can be set at any time, and if rmr_set_trace() is given a
823         trace len that is different than the default allcoated in a message, the message
824         will be resized.
825
826         Returns 0 on failure and 1 on success. If failure, then errno will be set.
827 */
828 extern int rmr_init_trace( void* vctx, int tr_len ) {
829         uta_ctx_t* ctx;
830
831         errno = 0;
832         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
833                 errno = EINVAL;
834                 return 0;
835         }
836
837         ctx->trace_data_len = tr_len;
838         return 1;
839 }
840
841 /*
842         Return true if routing table is initialised etc. and app can send/receive.
843 */
844 extern int rmr_ready( void* vctx ) {
845         uta_ctx_t *ctx;
846
847         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
848                 return FALSE;
849         }
850
851         return ctx->rtable_ready;
852 }
853
854 /*
855         This returns the message queue ring's filedescriptor which can be used for
856         calls to epoll.  The user shouild NOT read, write, or close the fd.
857
858         Returns the file descriptor or -1 on error.
859 */
860 extern int rmr_get_rcvfd( void* vctx ) {
861         uta_ctx_t* ctx;
862         int state;
863
864         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
865                 return -1;
866         }
867
868         return uta_ring_getpfd( ctx->mring );
869 }
870
871
872 /*
873         Clean up things.
874
875         There isn't an si_flush() per se, but we can pause, generate
876         a context switch, which should allow the last sent buffer to
877         flow. There isn't exactly an nng_term/close either, so there
878         isn't much we can do.
879 */
880 extern void rmr_close( void* vctx ) {
881         uta_ctx_t *ctx;
882
883         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
884                 return;
885         }
886
887         if( ctx->seed_rt_fname != NULL ) {
888                 free( ctx->seed_rt_fname );
889         }
890
891         ctx->shutdown = 1;
892
893         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
894
895         // FIX ME -- how to we turn off si; close all sessions etc?
896         //SIclose( ctx->nn_sock );
897
898 }
899
900
901 // ----- multi-threaded call/receive support -------------------------------------------------
902
903 /*
904         Blocks on the receive ring chute semaphore and then reads from the ring
905         when it is tickled.  If max_wait is -1 then the function blocks until
906         a message is ready on the ring. Else max_wait is assumed to be the number
907         of millaseconds to wait before returning a timeout message.
908 */
909 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
910         uta_ctx_t*      ctx;
911         chute_t*        chute;
912         struct timespec ts;                     // time info if we have a timeout
913         long    new_ms;                         // adjusted mu-sec
914         long    seconds = 0;            // max wait seconds
915         long    nano_sec;                       // max wait xlated to nano seconds
916         int             state;
917         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
918
919         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
920                 errno = EINVAL;
921                 if( mbuf ) {
922                         mbuf->state = RMR_ERR_BADARG;
923                         mbuf->tp_state = errno;
924                 }
925                 return mbuf;
926         }
927
928         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
929
930         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
931
932         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
933                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
934                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
935                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
936                         if( ombuf ) {
937                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
938                         }
939                 } else {
940                         mbuf = ombuf;                                           // return original if it was given with timeout status
941                         if( ombuf != NULL ) {
942                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
943                                 mbuf->len = 0;
944                         }
945                 }
946
947                 if( mbuf != NULL ) {
948                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
949                 }
950
951                 return mbuf;
952         }
953
954         if( ombuf ) {
955                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
956                 ombuf->len = 0;
957         }
958         if( max_wait > 0 ) {
959                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
960
961                 if( max_wait > 999 ) {
962                         seconds = max_wait / 1000;
963                         max_wait -= seconds * 1000;
964                         ts.tv_sec += seconds;
965                 }
966                 if( max_wait > 0 ) {
967                         nano_sec = max_wait * 1000000;
968                         ts.tv_nsec += nano_sec;
969                         if( ts.tv_nsec > 999999999 ) {
970                                 ts.tv_nsec -= 999999999;
971                                 ts.tv_sec++;
972                         }
973                 }
974
975                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
976         }
977
978         errno = EINTR;
979         state = -1;
980         while( state < 0 && errno == EINTR ) {
981                 if( seconds ) {
982                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
983                 } else {
984                         state = sem_wait( &chute->barrier );
985                 }
986         }
987
988         if( state < 0 ) {
989                 mbuf = ombuf;                           // return caller's buffer if they passed one in
990         } else {
991                 errno = 0;                                              // interrupted call state could be left; clear
992                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
993                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
994                         mbuf->state = RMR_OK;
995                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
996
997                         if( ombuf ) {
998                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
999                         }
1000                 } else {
1001                         errno = ETIMEDOUT;
1002                         mbuf = ombuf;                           // no buffer, return user's if there
1003                 }
1004         }
1005
1006         if( mbuf ) {
1007                 mbuf->tp_state = errno;
1008         }
1009         return mbuf;
1010 }
1011
1012
1013
1014
1015 /*
1016         This is the work horse for the multi-threaded call() function. It supports
1017         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1018         for for rmr_mt_call() modulo the caveat below.
1019
1020         If endpoint is given, then we assume that we're not doing normal route table
1021         routing and that we should send directly to that endpoint (probably worm
1022         hole).
1023 */
1024 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1025         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1026         uta_ctx_t*      ctx;
1027         uta_mhdr_t*     hdr;                    // header in the transport buffer
1028         chute_t*        chute;
1029         unsigned char*  d1;                     // d1 data in header
1030         struct timespec ts;                     // time info if we have a timeout
1031         long    new_ms;                         // adjusted mu-sec
1032         long    seconds = 0;            // max wait seconds
1033         long    nano_sec;                       // max wait xlated to nano seconds
1034         int             state;
1035
1036         errno = EINVAL;
1037         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1038                 if( mbuf ) {
1039                         mbuf->tp_state = errno;
1040                         mbuf->state = RMR_ERR_BADARG;
1041                 }
1042                 return mbuf;
1043         }
1044
1045         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1046                 mbuf->state = RMR_ERR_NOTSUPP;
1047                 mbuf->tp_state = errno;
1048                 return mbuf;
1049         }
1050
1051         ombuf = mbuf;                                                                                                   // save to return timeout status with
1052
1053         chute = &ctx->chutes[call_id];
1054         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1055                 rmr_free_msg( chute->mbuf );
1056                 chute->mbuf = NULL;
1057         }
1058
1059         hdr = (uta_mhdr_t *) mbuf->header;
1060         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1061         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1062         d1 = DATA1_ADDR( hdr );
1063         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1064         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1065
1066         if( max_wait >= 0 ) {
1067                 clock_gettime( CLOCK_REALTIME, &ts );
1068
1069                 if( max_wait > 999 ) {
1070                         seconds = max_wait / 1000;
1071                         max_wait -= seconds * 1000;
1072                         ts.tv_sec += seconds;
1073                 }
1074                 if( max_wait > 0 ) {
1075                         nano_sec = max_wait * 1000000;
1076                         ts.tv_nsec += nano_sec;
1077                         if( ts.tv_nsec > 999999999 ) {
1078                                 ts.tv_nsec -= 999999999;
1079                                 ts.tv_sec++;
1080                         }
1081                 }
1082
1083                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1084         }
1085
1086         if( ep == NULL ) {                                                                              // normal routing
1087                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1088         } else {
1089                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1090         }
1091         if( mbuf ) {
1092                 if( mbuf->state != RMR_OK ) {
1093                         mbuf->tp_state = errno;
1094                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1095                 }
1096         }
1097
1098         state = 0;
1099         errno = 0;
1100         while( chute->mbuf == NULL && ! errno ) {
1101                 if( seconds ) {
1102                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1103                 } else {
1104                         state = sem_wait( &chute->barrier );
1105                 }
1106
1107                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1108                         errno = 0;
1109                 }
1110
1111                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1112                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1113                                 rmr_free_msg( chute->mbuf );
1114                                 chute->mbuf = NULL;
1115                                 errno = 0;
1116                         }
1117                 }
1118         }
1119
1120         if( state < 0 ) {
1121                 return NULL;                                    // leave errno as set by sem wait call
1122         }
1123
1124         mbuf = chute->mbuf;
1125         if( mbuf != NULL ) {
1126                 mbuf->state = RMR_OK;
1127         }
1128         chute->mbuf = NULL;
1129
1130         return mbuf;
1131 }
1132
1133 /*
1134         Accept a message buffer and caller ID, send the message and then wait
1135         for the receiver to tickle the semaphore letting us know that a message
1136         has been received. The call_id is a value between 2 and 255, inclusive; if
1137         it's not in this range an error will be returned. Max wait is the amount
1138         of time in millaseconds that the call should block for. If 0 is given
1139         then no timeout is set.
1140
1141         If the mt_call feature has not been initialised, then the attempt to use this
1142         funciton will fail with RMR_ERR_NOTSUPP
1143
1144         If no matching message is received before the max_wait period expires, a
1145         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1146         occurs after the message has been sent, then a nil pointer is returned
1147         with errno set to some other value.
1148
1149         This is now just an outward facing wrapper so we can support wormhole calls.
1150 */
1151 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1152
1153         // must vet call_id here, all others vetted by workhorse mt_call() function
1154         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1155                 if( mbuf != NULL ) {
1156                         mbuf->state = RMR_ERR_BADARG;
1157                         mbuf->tp_state = EINVAL;
1158                 }
1159                 return mbuf;
1160         }
1161
1162         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1163 }
1164
1165
1166 /*
1167         Given an existing message buffer, reallocate the payload portion to
1168         be at least new_len bytes.  The message header will remain such that
1169         the caller may use the rmr_rts_msg() function to return a payload
1170         to the sender.
1171
1172         The mbuf passed in may or may not be reallocated and the caller must
1173         use the returned pointer and should NOT assume that it can use the
1174         pointer passed in with the exceptions based on the clone flag.
1175
1176         If the clone flag is set, then a duplicated message, with larger payload
1177         size, is allocated and returned.  The old_msg pointer in this situation is
1178         still valid and must be explicitly freed by the application. If the clone
1179         message is not set (0), then any memory management of the old message is
1180         handled by the function.
1181
1182         If the copy flag is set, the contents of the old message's payload is
1183         copied to the reallocated payload.  If the flag is not set, then the
1184         contents of the payload is undetermined.
1185 */
1186 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1187         if( old_msg == NULL ) {
1188                 return NULL;
1189         }
1190
1191         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1192 }
1193
1194 /*
1195         Enable low latency things in the transport (when supported).
1196 */
1197 extern void rmr_set_low_latency( void* vctx ) {
1198         uta_ctx_t*      ctx;
1199
1200         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1201                 if( ctx->si_ctx != NULL ) {
1202                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1203                 }
1204         }
1205 }
1206
1207 /*
1208         Turn on fast acks.
1209 */
1210 extern void rmr_set_fack( void* vctx ) {
1211         uta_ctx_t*      ctx;
1212
1213         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1214                 if( ctx->si_ctx != NULL ) {
1215                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1216                 }
1217         }
1218 }
1219