Add ability to save route table updates to disk
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "alarm.c"
71 #include "rtc_static.c"                         // route table collector (thread code)
72 #include "tools_static.c"
73 #include "sr_si_static.c"                       // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_si_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ){
88                         free( ctx->rtg_addr );
89                 }
90                 uta_ring_free( ctx->mring );
91                 uta_ring_free( ctx->zcb_mring );
92                 if( ctx->chutes ){
93                         free( ctx->chutes );
94                 }
95                 if( ctx->fd2ep ){
96                         rmr_sym_free( ctx->fd2ep );
97                 }
98                 if( ctx->my_name ){
99                         free( ctx->my_name );
100                 }
101                 if( ctx->my_ip ){
102                         free( ctx->my_ip );
103                 }
104                 if( ctx->rtable ){
105                         rmr_sym_free( ctx->rtable->hash );
106                         free( ctx->rtable );
107                 }
108                 if ( ctx->ephash ){
109                         free( ctx->ephash );
110                 }
111                 free( ctx );
112         }
113 }
114
115 // --------------- public functions --------------------------------------------------------------------------
116
117 /*
118         Returns the size of the payload (bytes) that the msg buffer references.
119         Len in a message is the number of bytes which were received, or should
120         be transmitted, however, it is possible that the mbuf was allocated
121         with a larger payload space than the payload length indicates; this
122         function returns the absolute maximum space that the user has available
123         in the payload. On error (bad msg buffer) -1 is returned and errno should
124         indicate the rason.
125
126         The allocated len stored in the msg is:
127                 transport header length +
128                 message header +
129                 user requested payload
130
131         The msg header is a combination of the fixed RMR header and the variable
132         trace data and d2 fields which may vary for each message.
133 */
134 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
135         if( msg == NULL || msg->header == NULL ) {
136                 errno = EINVAL;
137                 return -1;
138         }
139
140         errno = 0;
141         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
142 }
143
144 /*
145         Allocates a send message as a zerocopy message allowing the underlying message protocol
146         to send the buffer without copy.
147 */
148 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
149         uta_ctx_t*      ctx;
150         rmr_mbuf_t*     m;
151
152         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
153                 return NULL;
154         }
155
156         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
157         return  m;
158 }
159
160
161 /*
162         Allocates a send message as a zerocopy message allowing the underlying message protocol
163         to send the buffer without copy. In addition, a trace data field of tr_size will be
164         added and the supplied data coppied to the buffer before returning the message to
165         the caller.
166 */
167 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
168         uta_ctx_t*      ctx;
169         rmr_mbuf_t*     m;
170         int state;
171
172         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
173                 return NULL;
174         }
175
176         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
177         if( m != NULL ) {
178                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
179                 if( state != tr_size ) {
180                         m->state = RMR_ERR_INITFAILED;
181                 }
182         }
183
184         return  m;
185 }
186
187 /*
188         This provides an external path to the realloc static function as it's called by an
189         outward facing mbuf api function. Used to reallocate a message with a different
190         trace data size.
191
192         User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
193         is still valid following this call. The caller is reponsible for maintainting
194         a pointer to both old and new messages and invoking rmr_free_msg() on both!
195 */
196 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
197         return realloc_msg( msg, new_tr_size );
198 }
199
200
201 /*
202         Return the message to the available pool, or free it outright.
203 */
204 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
205         if( mbuf == NULL ) {
206                 fprintf( stderr, ">>>FREE  nil buffer\n" );
207                 return;
208         }
209
210 #ifdef KEEP
211         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
212                 ! mbuf->ring ||                                                                         // cant cache if no ring
213                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
214
215                 if( mbuf->tp_buf ) {
216                         free( mbuf->tp_buf );
217                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
218                 }
219
220                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
221                 free( mbuf );
222         }
223 #else
224         // always free, never manage a pool
225         if( mbuf->tp_buf ) {
226                 free( mbuf->tp_buf );
227                 mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
228         }
229
230         mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
231         free( mbuf );
232 #endif
233 }
234
235 /*
236         This is a wrapper to the real timeout send. We must wrap it now to ensure that
237         the call flag and call-id are reset
238 */
239 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
240         char* d1;                                                                                                                       // point at the call-id in the header
241
242         if( msg != NULL ) {
243                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
244
245                 d1 = DATA1_ADDR( msg->header );
246                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
247         }
248
249         return mtosend_msg( vctx, msg, max_to );
250 }
251
252 /*
253         Send with default max timeout as is set in the context.
254         See rmr_mtosend_msg() for more details on the parameters.
255         See rmr_stimeout() for info on setting the default timeout.
256 */
257 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
258         char* d1;                                                                                                               // point at the call-id in the header
259
260         if( msg != NULL ) {
261                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
262
263                 d1 = DATA1_ADDR( msg->header );
264                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
265         }
266
267         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
268 }
269
270 /*
271         Return to sender allows a message to be sent back to the endpoint where it originated.
272
273         With SI95 it was thought that the return to sender would be along the same open conneciton
274         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
275         applications sending at high message rates, returning responses on the same connection
276         causes major strife. Thus the decision was made to use the same method as NNG and just
277         open a second connection for reverse path.
278
279         We will attempt to use the name in the received message to look up the endpoint. If
280         that failes, then we will write on the connection that the message arrived on as a
281         falback.
282
283         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
284         and on error it can be passed back to this function to retry the send if desired. On error,
285         errno will liklely have the failure reason set by the nng send processing.  The following
286         are possible values for the state in the message buffer:
287
288         Message states returned:
289                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
290                 RMR_ERR_NOHDR  - message did not have a header
291                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
292                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
293                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
294
295         A nil message as the return value is rare, and generally indicates some kind of horrible
296         failure. The value of errno might give a clue as to what is wrong.
297
298         CAUTION:
299                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
300                 The caller must check for this and handle it properly.
301 */
302 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
303         int                     nn_sock;                        // endpoint socket for send
304         uta_ctx_t*      ctx;
305         char*           hold_src;                       // we need the original source if send fails
306         char*           hold_ip;                        // also must hold original ip
307         int                     sock_ok = 0;            // true if we found a valid endpoint socket
308         endpoint_t*     ep = NULL;                      // end point to track counts
309
310         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
311                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
312                 if( msg != NULL ) {
313                         msg->state = RMR_ERR_BADARG;
314                         msg->tp_state = errno;
315                 }
316                 return msg;
317         }
318
319         errno = 0;                                                                                                              // at this point any bad state is in msg returned
320         if( msg->header == NULL ) {
321                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
322                 msg->state = RMR_ERR_NOHDR;
323                 msg->tp_state = errno;
324                 return msg;
325         }
326
327         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
328
329         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
330         if( ! sock_ok ) {
331                 if( (nn_sock = msg->rts_fd) < 0 ) {
332                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
333                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
334                         }
335                         if( ! sock_ok ) {
336                                 msg->state = RMR_ERR_NOENDPT;
337                                 return msg;
338                         }
339                 }
340         }
341
342
343         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
344         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
345         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
346         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
347         msg = send_msg( ctx, msg, nn_sock, -1 );
348         if( msg ) {
349                 if( ep != NULL ) {
350                         switch( msg->state ) {
351                                 case RMR_OK:
352                                         ep->scounts[EPSC_GOOD]++;
353                                         break;
354
355                                 case RMR_ERR_RETRY:
356                                         ep->scounts[EPSC_TRANS]++;
357                                         break;
358
359                                 default:
360                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
361                                         ep->scounts[EPSC_FAIL]++;
362                                         break;
363                         }
364                 }
365                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
366                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
367                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
368         }
369
370         free( hold_src );
371         free( hold_ip );
372         return msg;
373 }
374
375 /*
376         If multi-threading call is turned on, this invokes that mechanism with the special call
377         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
378         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
379         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
380         a flexible timeout.
381
382         On timeout this function will return a nil pointer. If the original message could not
383         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
384
385         Original behavour:
386         Call sends the message based on message routing using the message type, and waits for a
387         response message to arrive with the same transaction id that was in the outgoing message.
388         If, while wiating for the expected response,  messages are received which do not have the
389         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
390         order that they were received.
391
392         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
393         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
394         may be resent (likely the context pointer was nil).  If the message is sent, but no
395         response is received, a nil message is returned with errno set to indicate the likley
396         issue:
397                 ETIMEDOUT -- too many messages were queued before reciving the expected response
398                 ENOBUFS -- the queued message ring is full, messages were dropped
399                 EINVAL  -- A parameter was not valid
400                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
401                                         user should call this function with the message again.
402
403 */
404 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
405         uta_ctx_t*              ctx;
406
407         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
408                 if( msg != NULL ) {
409                         msg->state = RMR_ERR_BADARG;
410                 }
411                 return msg;
412         }
413
414     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
415 }
416
417 /*
418         The outward facing receive function. When invoked it will pop the oldest message
419         from the receive ring, if any are queued, and return it. If the ring is empty
420         then the receive function is invoked to wait for the next message to arrive (blocking).
421
422         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
423         nil, a new one will be allocated. However, the caller should NOT expect to get the same
424         struct back (if a queued message is returned the message struct will be different).
425 */
426 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
427         uta_ctx_t*      ctx;
428         rmr_mbuf_t*     qm;                             // message that was queued on the ring
429
430         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
431                 errno = EINVAL;
432                 if( old_msg != NULL ) {
433                         old_msg->state = RMR_ERR_BADARG;
434                         old_msg->tp_state = errno;
435                 }
436                 return old_msg;
437         }
438         errno = 0;
439
440         return rmr_mt_rcv( ctx, old_msg, -1 );
441 }
442
443 /*
444         This allows a timeout based receive for applications unable to implement epoll_wait()
445         (e.g. wrappers).
446 */
447 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
448         uta_ctx_t*      ctx;
449
450         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
451                 errno = EINVAL;
452                 if( old_msg != NULL ) {
453                         old_msg->state = RMR_ERR_BADARG;
454                         old_msg->tp_state = errno;
455                 }
456                 return old_msg;
457         }
458
459         return rmr_mt_rcv( ctx, old_msg, ms_to );
460 }
461
462 /*
463         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
464                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
465                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
466
467         This blocks until the message with the 'expect' ID is received. Messages which are received
468         before the expected message are queued onto the message ring.  The function will return
469         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
470         expected message is received. If the queued message ring fills a nil pointer is returned
471         and errno is set to ENOBUFS.
472
473         Generally this will be invoked only by the call() function as it waits for a response, but
474         it is exposed to the user application as three is no reason not to.
475 */
476 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
477         uta_ctx_t*      ctx;
478         int     queued = 0;                             // number we pushed into the ring
479         int     exp_len = 0;                    // length of expected ID
480
481         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
482                 errno = EINVAL;
483                 if( msg != NULL ) {
484                         msg->state = RMR_ERR_BADARG;
485                         msg->tp_state = errno;
486                 }
487                 return msg;
488         }
489
490         errno = 0;
491
492         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
493                 return rmr_rcv_msg( ctx, msg );
494         }
495
496         exp_len = strlen( expect );
497         if( exp_len > RMR_MAX_XID ) {
498                 exp_len = RMR_MAX_XID;
499         }
500         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
501
502         while( queued < allow2queue ) {
503                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
504                 if( msg != NULL ) {
505                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
506                         if( msg->state == RMR_OK ) {
507                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
508                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
509                                         return msg;
510                                 }
511
512                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
513                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
514                                         errno = ENOBUFS;
515                                         return NULL;
516                                 }
517
518                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
519                                 queued++;
520                                 msg = NULL;
521                         }
522                 }
523         }
524
525         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
526         errno = ETIMEDOUT;
527         return NULL;
528 }
529
530 /*
531         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
532         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
533         mechnism indicates eagain or etimeedout.  All other error conditions are reported
534         without this delay. Setting a timeout of 0 causes no retries to be attempted in
535         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
536         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
537         after every 1K send attempts until the "time" value is reached. Retries are abandoned
538         if NNG returns anything other than EAGAIN or EINTER is returned.
539
540         The default, if this function is not used, is 1; meaning that RMr will retry, but will
541         not enter a sleep.  In all cases the caller should check the status in the message returned
542         after a send call.
543
544         Returns -1 if the context was invalid; RMR_OK otherwise.
545 */
546 extern int rmr_set_stimeout( void* vctx, int time ) {
547         uta_ctx_t*      ctx;
548
549         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
550                 return -1;
551         }
552
553         if( time < 0 ) {
554                 time = 0;
555         }
556
557         ctx->send_retries = time;
558         return RMR_OK;
559 }
560
561 /*
562         Set receive timeout -- not supported in nng implementation
563
564         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
565 */
566 extern int rmr_set_rtimeout( void* vctx, int time ) {
567         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
568         return 0;
569 }
570
571
572 /*
573         This is the actual init workhorse. The user visible function meerly ensures that the
574         calling programme does NOT set any internal flags that are supported, and then
575         invokes this.  Internal functions (the route table collector) which need additional
576         open ports without starting additional route table collectors, will invoke this
577         directly with the proper flag.
578
579         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
580                                 that we know about. The _user_ should ensure that the supplied length also
581                                 includes the trace data length maximum as they are in control of that.
582 */
583 static void* init( char* uproto_port, int def_msg_size, int flags ) {
584         static  int announced = 0;
585         uta_ctx_t*      ctx = NULL;
586         char    bind_info[256];                         // bind info
587         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
588         char*   port;                                           // pointer into the proto_port buffer at the port value
589         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
590         char*   proto_port;
591         char    wbuf[1024];                                     // work buffer
592         char*   tok;                                            // pointer at token in a buffer
593         char*   tok2;
594         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
595         int             state;
596         int             i;
597         int             old_vlevel;
598
599         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
600
601         if( ! announced ) {
602                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
603                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
604                         uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
605                 announced = 1;
606
607                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
608                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
609         }
610
611         errno = 0;
612         if( uproto_port == NULL ) {
613                 proto_port = strdup( DEF_COMM_PORT );
614                 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
615         } else {
616                 proto_port = strdup( uproto_port );             // so we can modify it
617         }
618
619         if ( proto_port == NULL ){
620                 errno = ENOMEM;
621                 return NULL;
622         }
623
624         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
625                 errno = ENOMEM;
626                 goto err;
627         }
628         memset( ctx, 0, sizeof( uta_ctx_t ) );
629
630         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
631         ctx->snarf_rt_fd = -1;
632         ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
633         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
634         ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
635         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
636         for( i = 0; i < ctx->nrivers; i++ ) {
637                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
638         }
639
640         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
641         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
642         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
643         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
644
645         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
646         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
647
648         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
649                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
650                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
651                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
652         } else {
653                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
654         }
655         init_mtcall( ctx );                                                             // set up call chutes
656         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
657
658
659         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
660         if( def_msg_size > 0 ) {
661                 ctx->max_plen = def_msg_size;
662         }
663
664         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
665         if( ctx->si_ctx == NULL ) {
666                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
667                 goto err;
668         }
669
670         if( (port = strchr( proto_port, ':' )) != NULL ) {
671                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
672                         port++;
673                 } else {
674                         *(port++) = 0;                  // term proto string and point at port string
675                         proto = proto_port;             // user supplied proto so point at it rather than default
676                 }
677         } else {
678                 port = proto_port;                      // assume something like "1234" was passed
679         }
680         rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
681
682         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
683                 static_rtc = 1;
684         }
685
686         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
687                 tok = strdup( tok );                                    // something we can destroy
688                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
689                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
690                 } else {
691                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
692                 }
693                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
694                         *tok2 = 0;                                                      // make it so
695                 }
696
697                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
698                 free( tok );
699         } else {
700                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
701                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
702                         goto err;
703                 }
704                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
705                         *tok = 0;                                                                       // we don't keep domain portion
706                 }
707         }
708
709         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
710         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
711                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
712                 errno = EINVAL;
713                 goto err;
714         }
715
716         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
717                 if( atoi( tok ) > 0 ) {
718                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
719                 }
720         }
721
722         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
723         if( flags & RMRFL_NAME_ONLY ) {
724                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
725         } else {
726                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
727                 if( ctx->my_ip == NULL ) {
728                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
729                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
730                 }
731         }
732         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
733
734         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
735                 if( *tok == '1' ) {
736                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
737                 }
738         }
739
740
741         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
742                 interface = "0.0.0.0";
743         }
744
745         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
746         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
747                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
748                 goto err;
749         }
750
751                                                                                                 // finish all flag setting before threads to keep helgrind quiet
752         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
753
754
755         // ---------------- setup for route table collector before invoking ----------------------------------
756         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
757         if( ctx->rtgate != NULL ) {
758                 pthread_mutex_init( ctx->rtgate, NULL );
759         }
760
761         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
762         if( ctx->ephash == NULL ) {
763                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
764                 errno = ENOMEM;
765                 goto err;
766         }
767
768         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
769         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
770                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
771         } else {
772                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
773
774                 if( static_rtc ) {
775                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
776                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
777                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
778                         }
779                 } else {
780                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
781                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
782                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
783                         }
784                 }
785         }
786
787         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
788                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
789         }
790
791         free( proto_port );
792         return (void *) ctx;
793
794 err:
795         free( proto_port );
796         free_ctx( ctx );
797         return NULL;
798 }
799
800 /*
801         Initialise the message routing environment. Flags are one of the UTAFL_
802         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
803         (tcp) to be used, then :port is all that is needed.
804
805         At the moment it seems that TCP really is the only viable protocol, but
806         we'll allow flexibility.
807
808         The return value is a void pointer which must be passed to most uta functions. On
809         error, a nil pointer is returned and errno should be set.
810
811         Flags:
812                 No user flags supported (needed) at the moment, but this provides for extension
813                 without drastically changing anything. The user should invoke with RMRFL_NONE to
814                 avoid any misbehavour as there are internal flags which are suported
815 */
816 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
817         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
818 }
819
820 /*
821         This sets the default trace length which will be added to any message buffers
822         allocated.  It can be set at any time, and if rmr_set_trace() is given a
823         trace len that is different than the default allcoated in a message, the message
824         will be resized.
825
826         Returns 0 on failure and 1 on success. If failure, then errno will be set.
827 */
828 extern int rmr_init_trace( void* vctx, int tr_len ) {
829         uta_ctx_t* ctx;
830
831         errno = 0;
832         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
833                 errno = EINVAL;
834                 return 0;
835         }
836
837         ctx->trace_data_len = tr_len;
838         return 1;
839 }
840
841 /*
842         Return true if routing table is initialised etc. and app can send/receive.
843 */
844 extern int rmr_ready( void* vctx ) {
845         uta_ctx_t *ctx;
846
847         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
848                 return FALSE;
849         }
850
851         return ctx->rtable_ready;
852 }
853
854 /*
855         This returns the message queue ring's filedescriptor which can be used for
856         calls to epoll.  The user shouild NOT read, write, or close the fd.
857
858         Returns the file descriptor or -1 on error.
859 */
860 extern int rmr_get_rcvfd( void* vctx ) {
861         uta_ctx_t* ctx;
862         int state;
863
864         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
865                 return -1;
866         }
867
868         return uta_ring_getpfd( ctx->mring );
869 }
870
871
872 /*
873         Clean up things.
874
875         There isn't an si_flush() per se, but we can pause, generate
876         a context switch, which should allow the last sent buffer to
877         flow. There isn't exactly an nng_term/close either, so there
878         isn't much we can do.
879 */
880 extern void rmr_close( void* vctx ) {
881         uta_ctx_t *ctx;
882
883         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
884                 return;
885         }
886
887         ctx->shutdown = 1;
888
889         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
890
891         // FIX ME -- how to we turn off si; close all sessions etc?
892         //SIclose( ctx->nn_sock );
893
894 }
895
896
897 // ----- multi-threaded call/receive support -------------------------------------------------
898
899 /*
900         Blocks on the receive ring chute semaphore and then reads from the ring
901         when it is tickled.  If max_wait is -1 then the function blocks until
902         a message is ready on the ring. Else max_wait is assumed to be the number
903         of millaseconds to wait before returning a timeout message.
904 */
905 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
906         uta_ctx_t*      ctx;
907         chute_t*        chute;
908         struct timespec ts;                     // time info if we have a timeout
909         long    new_ms;                         // adjusted mu-sec
910         long    seconds = 0;            // max wait seconds
911         long    nano_sec;                       // max wait xlated to nano seconds
912         int             state;
913         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
914
915         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
916                 errno = EINVAL;
917                 if( mbuf ) {
918                         mbuf->state = RMR_ERR_BADARG;
919                         mbuf->tp_state = errno;
920                 }
921                 return mbuf;
922         }
923
924         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
925
926         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
927
928         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
929                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
930                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
931                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
932                         if( ombuf ) {
933                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
934                         }
935                 } else {
936                         mbuf = ombuf;                                           // return original if it was given with timeout status
937                         if( ombuf != NULL ) {
938                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
939                                 mbuf->len = 0;
940                         }
941                 }
942
943                 if( mbuf != NULL ) {
944                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
945                 }
946
947                 return mbuf;
948         }
949
950         if( ombuf ) {
951                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
952                 ombuf->len = 0;
953         }
954         if( max_wait > 0 ) {
955                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
956
957                 if( max_wait > 999 ) {
958                         seconds = max_wait / 1000;
959                         max_wait -= seconds * 1000;
960                         ts.tv_sec += seconds;
961                 }
962                 if( max_wait > 0 ) {
963                         nano_sec = max_wait * 1000000;
964                         ts.tv_nsec += nano_sec;
965                         if( ts.tv_nsec > 999999999 ) {
966                                 ts.tv_nsec -= 999999999;
967                                 ts.tv_sec++;
968                         }
969                 }
970
971                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
972         }
973
974         errno = EINTR;
975         state = -1;
976         while( state < 0 && errno == EINTR ) {
977                 if( seconds ) {
978                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
979                 } else {
980                         state = sem_wait( &chute->barrier );
981                 }
982         }
983
984         if( state < 0 ) {
985                 mbuf = ombuf;                           // return caller's buffer if they passed one in
986         } else {
987                 errno = 0;                                              // interrupted call state could be left; clear
988                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
989                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
990                         mbuf->state = RMR_OK;
991                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
992
993                         if( ombuf ) {
994                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
995                         }
996                 } else {
997                         errno = ETIMEDOUT;
998                         mbuf = ombuf;                           // no buffer, return user's if there
999                 }
1000         }
1001
1002         if( mbuf ) {
1003                 mbuf->tp_state = errno;
1004         }
1005         return mbuf;
1006 }
1007
1008
1009
1010
1011 /*
1012         This is the work horse for the multi-threaded call() function. It supports
1013         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1014         for for rmr_mt_call() modulo the caveat below.
1015
1016         If endpoint is given, then we assume that we're not doing normal route table
1017         routing and that we should send directly to that endpoint (probably worm
1018         hole).
1019 */
1020 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1021         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1022         uta_ctx_t*      ctx;
1023         uta_mhdr_t*     hdr;                    // header in the transport buffer
1024         chute_t*        chute;
1025         unsigned char*  d1;                     // d1 data in header
1026         struct timespec ts;                     // time info if we have a timeout
1027         long    new_ms;                         // adjusted mu-sec
1028         long    seconds = 0;            // max wait seconds
1029         long    nano_sec;                       // max wait xlated to nano seconds
1030         int             state;
1031
1032         errno = EINVAL;
1033         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1034                 if( mbuf ) {
1035                         mbuf->tp_state = errno;
1036                         mbuf->state = RMR_ERR_BADARG;
1037                 }
1038                 return mbuf;
1039         }
1040
1041         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1042                 mbuf->state = RMR_ERR_NOTSUPP;
1043                 mbuf->tp_state = errno;
1044                 return mbuf;
1045         }
1046
1047         ombuf = mbuf;                                                                                                   // save to return timeout status with
1048
1049         chute = &ctx->chutes[call_id];
1050         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1051                 rmr_free_msg( chute->mbuf );
1052                 chute->mbuf = NULL;
1053         }
1054
1055         hdr = (uta_mhdr_t *) mbuf->header;
1056         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1057         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1058         d1 = DATA1_ADDR( hdr );
1059         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1060         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1061
1062         if( max_wait >= 0 ) {
1063                 clock_gettime( CLOCK_REALTIME, &ts );
1064
1065                 if( max_wait > 999 ) {
1066                         seconds = max_wait / 1000;
1067                         max_wait -= seconds * 1000;
1068                         ts.tv_sec += seconds;
1069                 }
1070                 if( max_wait > 0 ) {
1071                         nano_sec = max_wait * 1000000;
1072                         ts.tv_nsec += nano_sec;
1073                         if( ts.tv_nsec > 999999999 ) {
1074                                 ts.tv_nsec -= 999999999;
1075                                 ts.tv_sec++;
1076                         }
1077                 }
1078
1079                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1080         }
1081
1082         if( ep == NULL ) {                                                                              // normal routing
1083                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1084         } else {
1085                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1086         }
1087         if( mbuf ) {
1088                 if( mbuf->state != RMR_OK ) {
1089                         mbuf->tp_state = errno;
1090                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1091                 }
1092         }
1093
1094         state = 0;
1095         errno = 0;
1096         while( chute->mbuf == NULL && ! errno ) {
1097                 if( seconds ) {
1098                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1099                 } else {
1100                         state = sem_wait( &chute->barrier );
1101                 }
1102
1103                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1104                         errno = 0;
1105                 }
1106
1107                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1108                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1109                                 rmr_free_msg( chute->mbuf );
1110                                 chute->mbuf = NULL;
1111                                 errno = 0;
1112                         }
1113                 }
1114         }
1115
1116         if( state < 0 ) {
1117                 return NULL;                                    // leave errno as set by sem wait call
1118         }
1119
1120         mbuf = chute->mbuf;
1121         if( mbuf != NULL ) {
1122                 mbuf->state = RMR_OK;
1123         }
1124         chute->mbuf = NULL;
1125
1126         return mbuf;
1127 }
1128
1129 /*
1130         Accept a message buffer and caller ID, send the message and then wait
1131         for the receiver to tickle the semaphore letting us know that a message
1132         has been received. The call_id is a value between 2 and 255, inclusive; if
1133         it's not in this range an error will be returned. Max wait is the amount
1134         of time in millaseconds that the call should block for. If 0 is given
1135         then no timeout is set.
1136
1137         If the mt_call feature has not been initialised, then the attempt to use this
1138         funciton will fail with RMR_ERR_NOTSUPP
1139
1140         If no matching message is received before the max_wait period expires, a
1141         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1142         occurs after the message has been sent, then a nil pointer is returned
1143         with errno set to some other value.
1144
1145         This is now just an outward facing wrapper so we can support wormhole calls.
1146 */
1147 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1148
1149         // must vet call_id here, all others vetted by workhorse mt_call() function
1150         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1151                 if( mbuf != NULL ) {
1152                         mbuf->state = RMR_ERR_BADARG;
1153                         mbuf->tp_state = EINVAL;
1154                 }
1155                 return mbuf;
1156         }
1157
1158         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1159 }
1160
1161
1162 /*
1163         Given an existing message buffer, reallocate the payload portion to
1164         be at least new_len bytes.  The message header will remain such that
1165         the caller may use the rmr_rts_msg() function to return a payload
1166         to the sender.
1167
1168         The mbuf passed in may or may not be reallocated and the caller must
1169         use the returned pointer and should NOT assume that it can use the
1170         pointer passed in with the exceptions based on the clone flag.
1171
1172         If the clone flag is set, then a duplicated message, with larger payload
1173         size, is allocated and returned.  The old_msg pointer in this situation is
1174         still valid and must be explicitly freed by the application. If the clone
1175         message is not set (0), then any memory management of the old message is
1176         handled by the function.
1177
1178         If the copy flag is set, the contents of the old message's payload is
1179         copied to the reallocated payload.  If the flag is not set, then the
1180         contents of the payload is undetermined.
1181 */
1182 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1183         if( old_msg == NULL ) {
1184                 return NULL;
1185         }
1186
1187         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1188 }
1189
1190 /*
1191         Enable low latency things in the transport (when supported).
1192 */
1193 extern void rmr_set_low_latency( void* vctx ) {
1194         uta_ctx_t*      ctx;
1195
1196         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1197                 if( ctx->si_ctx != NULL ) {
1198                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1199                 }
1200         }
1201 }
1202
1203 /*
1204         Turn on fast acks.
1205 */
1206 extern void rmr_set_fack( void* vctx ) {
1207         uta_ctx_t*      ctx;
1208
1209         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1210                 if( ctx->si_ctx != NULL ) {
1211                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1212                 }
1213         }
1214 }
1215