54a1bba883ff1af28b0eca702de16bfa83158f93
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2021 Nokia
5         Copyright (c) 2018-2021 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59
60 #define SI95_BUILD      1                       // we drop some common functions for si
61
62 #include "rmr.h"                                // things the users see
63 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
64 #include "rmr_si_private.h"             // things that we need too
65
66 #include "rmr_symtab.h"
67 #include "rmr_logging.h"
68
69 #include "ring_static.c"                        // message ring support
70 #include "rt_generic_static.c"          // route table things not transport specific
71 #include "rtable_si_static.c"           // route table things -- transport specific
72 #include "alarm.c"
73 #include "rtc_static.c"                         // route table collector (thread code)
74 #include "tools_static.c"
75 #include "sr_si_static.c"                       // send/receive static functions
76 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
77 #include "mt_call_static.c"
78 #include "mt_call_si_static.c"
79
80
81 //------------------------------------------------------------------------------
82
83 /*
84         If we have an EP, up the counters based on state.
85         This isn't needed, but it makes driving the code under unit test easier so we
86         induldge in the bean counter's desire for coverage numbers.
87 */
88 static inline void incr_ep_counts( int state, endpoint_t* ep ) {
89         if( ep != NULL ) {
90                 switch( state ) {
91                         case RMR_OK:
92                                 ep->scounts[EPSC_GOOD]++;
93                                 break;
94
95                         case RMR_ERR_RETRY:
96                                 ep->scounts[EPSC_TRANS]++;
97                                 break;
98
99                         default:
100                                 ep->scounts[EPSC_FAIL]++;
101                                 break;
102                 }
103         }
104 }
105
106 /*
107         Clean up a context.
108 */
109 static void free_ctx( uta_ctx_t* ctx ) {
110         if( ctx ) {
111                 if( ctx->rtg_addr ){
112                         free( ctx->rtg_addr );
113                 }
114                 uta_ring_free( ctx->mring );
115                 uta_ring_free( ctx->zcb_mring );
116                 if( ctx->chutes ){
117                         free( ctx->chutes );
118                 }
119                 if( ctx->fd2ep ){
120                         rmr_sym_free( ctx->fd2ep );
121                 }
122                 if( ctx->my_name ){
123                         free( ctx->my_name );
124                 }
125                 if( ctx->my_ip ){
126                         free( ctx->my_ip );
127                 }
128                 if( ctx->rtable ){
129                         rmr_sym_free( ctx->rtable->hash );
130                         free( ctx->rtable );
131                 }
132                 if ( ctx->ephash ){
133                         free( ctx->ephash );
134                 }
135                 free( ctx );
136         }
137 }
138
139 // --------------- public functions --------------------------------------------------------------------------
140
141 /*
142         Returns the size of the payload (bytes) that the msg buffer references.
143         Len in a message is the number of bytes which were received, or should
144         be transmitted, however, it is possible that the mbuf was allocated
145         with a larger payload space than the payload length indicates; this
146         function returns the absolute maximum space that the user has available
147         in the payload. On error (bad msg buffer) -1 is returned and errno should
148         indicate the rason.
149
150         The allocated len stored in the msg is:
151                 transport header length +
152                 message header +
153                 user requested payload
154
155         The msg header is a combination of the fixed RMR header and the variable
156         trace data and d2 fields which may vary for each message.
157 */
158 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
159         if( msg == NULL || msg->header == NULL ) {
160                 errno = EINVAL;
161                 return -1;
162         }
163
164         errno = 0;
165         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
166 }
167
168 /*
169         Allocates a send message as a zerocopy message allowing the underlying message protocol
170         to send the buffer without copy.
171 */
172 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
173         uta_ctx_t*      ctx;
174         rmr_mbuf_t*     m;
175
176         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
177                 return NULL;
178         }
179
180         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
181         return  m;
182 }
183
184
185 /*
186         Allocates a send message as a zerocopy message allowing the underlying message protocol
187         to send the buffer without copy. In addition, a trace data field of tr_size will be
188         added and the supplied data coppied to the buffer before returning the message to
189         the caller.
190 */
191 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
192         uta_ctx_t*      ctx;
193         rmr_mbuf_t*     m;
194         int state;
195
196         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
197                 return NULL;
198         }
199
200         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
201         if( m != NULL ) {
202                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
203                 if( state != tr_size ) {
204                         m->state = RMR_ERR_INITFAILED;
205                 }
206         }
207
208         return  m;
209 }
210
211 /*
212         This provides an external path to the realloc static function as it's called by an
213         outward facing mbuf api function. Used to reallocate a message with a different
214         trace data size.
215
216         User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
217         is still valid following this call. The caller is reponsible for maintainting
218         a pointer to both old and new messages and invoking rmr_free_msg() on both!
219 */
220 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
221         return realloc_msg( msg, new_tr_size );
222 }
223
224
225 /*
226         Return the message to the available pool, or free it outright.
227 */
228 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
229         if( mbuf == NULL ) {
230                 fprintf( stderr, ">>>FREE  nil buffer\n" );
231                 return;
232         }
233
234 #ifdef KEEP
235         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
236                 ! mbuf->ring ||                                                                         // cant cache if no ring
237                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
238
239                 if( mbuf->tp_buf ) {
240                         free( mbuf->tp_buf );
241                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
242                 }
243
244                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
245                 free( mbuf );
246         }
247 #else
248         // always free, never manage a pool
249         if( mbuf->tp_buf ) {
250                 free( mbuf->tp_buf );
251                 mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
252         }
253
254         mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
255         free( mbuf );
256 #endif
257 }
258
259 /*
260         This is a wrapper to the real timeout send. We must wrap it now to ensure that
261         the call flag and call-id are reset
262 */
263 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
264         char* d1;                                                                                                                       // point at the call-id in the header
265
266         if( msg != NULL ) {
267                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
268
269                 d1 = DATA1_ADDR( msg->header );
270                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
271         }
272
273         return mtosend_msg( vctx, msg, max_to );
274 }
275
276 /*
277         Send with default max timeout as is set in the context.
278         See rmr_mtosend_msg() for more details on the parameters.
279         See rmr_stimeout() for info on setting the default timeout.
280 */
281 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
282         char* d1;                                                                                                               // point at the call-id in the header
283
284         if( msg != NULL ) {
285                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
286
287                 d1 = DATA1_ADDR( msg->header );
288                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
289         }
290
291         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
292 }
293
294 /*
295         Return to sender allows a message to be sent back to the endpoint where it originated.
296
297         With SI95 it was thought that the return to sender would be along the same open conneciton
298         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
299         applications sending at high message rates, returning responses on the same connection
300         causes major strife. Thus the decision was made to use the same method as NNG and just
301         open a second connection for reverse path.
302
303         We will attempt to use the name in the received message to look up the endpoint. If
304         that failes, then we will write on the connection that the message arrived on as a
305         falback.
306
307         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
308         and on error it can be passed back to this function to retry the send if desired. On error,
309         errno will liklely have the failure reason set by the nng send processing.  The following
310         are possible values for the state in the message buffer:
311
312         Message states returned:
313                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
314                 RMR_ERR_NOHDR  - message did not have a header
315                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
316                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
317                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
318
319         A nil message as the return value is rare, and generally indicates some kind of horrible
320         failure. The value of errno might give a clue as to what is wrong.
321
322         CAUTION:
323                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
324                 The caller must check for this and handle it properly.
325 */
326 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
327         int                     nn_sock;                        // endpoint socket for send
328         uta_ctx_t*      ctx;
329         char*           hold_src;                       // we need the original source if send fails
330         char*           hold_ip;                        // also must hold original ip
331         int                     sock_ok = 0;            // true if we found a valid endpoint socket
332         endpoint_t*     ep = NULL;                      // end point to track counts
333
334         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
335                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
336                 if( msg != NULL ) {
337                         msg->state = RMR_ERR_BADARG;
338                         msg->tp_state = errno;
339                 }
340                 return msg;
341         }
342
343         errno = 0;                                                                                                              // at this point any bad state is in msg returned
344         if( msg->header == NULL ) {
345                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
346                 msg->state = RMR_ERR_NOHDR;
347                 msg->tp_state = errno;
348                 return msg;
349         }
350
351         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
352
353         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
354         if( ! sock_ok ) {
355                 if( (nn_sock = msg->rts_fd) < 0 ) {
356                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
357                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
358                         }
359                         if( ! sock_ok ) {
360                                 msg->state = RMR_ERR_NOENDPT;
361                                 return msg;
362                         }
363                 }
364         }
365
366         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
367         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
368         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
369         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
370         msg = send_msg( ctx, msg, nn_sock, -1 );
371         if( msg ) {
372                 incr_ep_counts(  msg->state, ep );                              // update counts
373
374                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
375                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
376                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
377         }
378
379         free( hold_src );
380         free( hold_ip );
381         return msg;
382 }
383
384 /*
385         If multi-threading call is turned on, this invokes that mechanism with the special call
386         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
387         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
388         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
389         a flexible timeout.
390
391         On timeout this function will return a nil pointer. If the original message could not
392         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
393
394         Original behavour:
395         Call sends the message based on message routing using the message type, and waits for a
396         response message to arrive with the same transaction id that was in the outgoing message.
397         If, while wiating for the expected response,  messages are received which do not have the
398         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
399         order that they were received.
400
401         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
402         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
403         may be resent (likely the context pointer was nil).  If the message is sent, but no
404         response is received, a nil message is returned with errno set to indicate the likley
405         issue:
406                 ETIMEDOUT -- too many messages were queued before reciving the expected response
407                 ENOBUFS -- the queued message ring is full, messages were dropped
408                 EINVAL  -- A parameter was not valid
409                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
410                                         user should call this function with the message again.
411
412 */
413 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
414         uta_ctx_t*              ctx;
415
416         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
417                 if( msg != NULL ) {
418                         msg->state = RMR_ERR_BADARG;
419                 }
420                 return msg;
421         }
422
423     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
424 }
425
426 /*
427         The outward facing receive function. When invoked it will pop the oldest message
428         from the receive ring, if any are queued, and return it. If the ring is empty
429         then the receive function is invoked to wait for the next message to arrive (blocking).
430
431         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
432         nil, a new one will be allocated. However, the caller should NOT expect to get the same
433         struct back (if a queued message is returned the message struct will be different).
434 */
435 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
436         uta_ctx_t*      ctx;
437         rmr_mbuf_t*     qm;                             // message that was queued on the ring
438
439         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
440                 errno = EINVAL;
441                 if( old_msg != NULL ) {
442                         old_msg->state = RMR_ERR_BADARG;
443                         old_msg->tp_state = errno;
444                 }
445                 return old_msg;
446         }
447         errno = 0;
448
449         return rmr_mt_rcv( ctx, old_msg, -1 );
450 }
451
452 /*
453         This allows a timeout based receive for applications unable to implement epoll_wait()
454         (e.g. wrappers).
455 */
456 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
457         uta_ctx_t*      ctx;
458
459         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
460                 errno = EINVAL;
461                 if( old_msg != NULL ) {
462                         old_msg->state = RMR_ERR_BADARG;
463                         old_msg->tp_state = errno;
464                 }
465                 return old_msg;
466         }
467
468         return rmr_mt_rcv( ctx, old_msg, ms_to );
469 }
470
471 /*
472         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
473                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
474                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
475
476         This blocks until the message with the 'expect' ID is received. Messages which are received
477         before the expected message are queued onto the message ring.  The function will return
478         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
479         expected message is received. If the queued message ring fills a nil pointer is returned
480         and errno is set to ENOBUFS.
481
482         Generally this will be invoked only by the call() function as it waits for a response, but
483         it is exposed to the user application as three is no reason not to.
484 */
485 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
486         uta_ctx_t*      ctx;
487         int     queued = 0;                             // number we pushed into the ring
488         int     exp_len = 0;                    // length of expected ID
489
490         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
491                 errno = EINVAL;
492                 if( msg != NULL ) {
493                         msg->state = RMR_ERR_BADARG;
494                         msg->tp_state = errno;
495                 }
496                 return msg;
497         }
498
499         errno = 0;
500
501         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
502                 return rmr_rcv_msg( ctx, msg );
503         }
504
505         exp_len = strlen( expect );
506         if( exp_len > RMR_MAX_XID ) {
507                 exp_len = RMR_MAX_XID;
508         }
509         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
510
511         while( queued < allow2queue ) {
512                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
513                 if( msg != NULL ) {
514                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
515                         if( msg->state == RMR_OK ) {
516                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
517                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
518                                         return msg;
519                                 }
520
521                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
522                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
523                                         errno = ENOBUFS;
524                                         return NULL;
525                                 }
526
527                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
528                                 queued++;
529                                 msg = NULL;
530                         }
531                 }
532         }
533
534         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
535         errno = ETIMEDOUT;
536         return NULL;
537 }
538
539 /*
540         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
541         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
542         mechnism indicates eagain or etimeedout.  All other error conditions are reported
543         without this delay. Setting a timeout of 0 causes no retries to be attempted in
544         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
545         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
546         after every 1K send attempts until the "time" value is reached. Retries are abandoned
547         if NNG returns anything other than EAGAIN or EINTER is returned.
548
549         The default, if this function is not used, is 1; meaning that RMr will retry, but will
550         not enter a sleep.  In all cases the caller should check the status in the message returned
551         after a send call.
552
553         Returns -1 if the context was invalid; RMR_OK otherwise.
554 */
555 extern int rmr_set_stimeout( void* vctx, int time ) {
556         uta_ctx_t*      ctx;
557
558         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
559                 return -1;
560         }
561
562         if( time < 0 ) {
563                 time = 0;
564         }
565
566         ctx->send_retries = time;
567         return RMR_OK;
568 }
569
570 /*
571         Set receive timeout -- not supported in nng implementation
572
573         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
574 */
575 extern int rmr_set_rtimeout( void* vctx, int time ) {
576         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
577         return 0;
578 }
579
580 /*
581         Common cleanup on initialisation error. These are hard to force, and this helps to ensure
582         all code is tested by providing a callable rather than a block of "goto" code.
583
584         There is a return value so that where we need this we get dinked only for one
585         uncovered line rather than two:
586                 init_err(...);
587                 return NULL;
588
589         That's a hack, and is yet another example of the testing tail wagging the dog.
590 */
591 static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
592         if( errval != 0 ) {                     // if not letting it be what a sysllib set it to...
593                 errno = errval;
594         }
595
596         if( port ) {                            // free things if allocated
597                 free( port );
598         }
599         if( ctx ) {
600                 free_ctx( ctx );
601         }
602
603         if( msg ) {                                                                     // crit message if supplied
604                 rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
605         }
606
607         return NULL;
608 }
609
610 /*
611         This is the actual init workhorse. The user visible function meerly ensures that the
612         calling programme does NOT set any internal flags that are supported, and then
613         invokes this.  Internal functions (the route table collector) which need additional
614         open ports without starting additional route table collectors, will invoke this
615         directly with the proper flag.
616
617         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
618                                 that we know about. The _user_ should ensure that the supplied length also
619                                 includes the trace data length maximum as they are in control of that.
620 */
621 static void* init( char* uproto_port, int def_msg_size, int flags ) {
622         static  int announced = 0;
623         uta_ctx_t*      ctx = NULL;
624         char    bind_info[256];                         // bind info
625         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
626         char*   port;                                           // pointer into the proto_port buffer at the port value
627         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
628         char*   proto_port;
629         char    wbuf[1024];                                     // work buffer
630         char*   tok;                                            // pointer at token in a buffer
631         char*   tok2;
632         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
633         int             state;
634         int             i;
635         int             old_vlevel;
636
637         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
638
639         if( ! announced ) {
640                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
641                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
642                         uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
643                 announced = 1;
644
645                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
646                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
647         }
648
649         errno = 0;
650         if( uproto_port == NULL ) {
651                 proto_port = strdup( DEF_COMM_PORT );
652                 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
653         } else {
654                 proto_port = strdup( uproto_port );             // so we can modify it
655         }
656
657         if ( proto_port == NULL ){
658                 return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
659         }
660
661         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
662                 return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
663         }
664         memset( ctx, 0, sizeof( uta_ctx_t ) );
665
666         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
667         ctx->snarf_rt_fd = -1;
668         ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
669         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
670         ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
671         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
672         for( i = 0; i < ctx->nrivers; i++ ) {
673                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
674         }
675
676         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
677         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
678         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
679         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
680
681         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
682         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
683
684         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
685                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
686                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
687                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
688         } else {
689                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
690         }
691         init_mtcall( ctx );                                                             // set up call chutes
692         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
693
694
695         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
696         if( def_msg_size > 0 ) {
697                 ctx->max_plen = def_msg_size;
698         }
699
700         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
701         if( ctx->si_ctx == NULL ) {
702                 return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
703         }
704
705         if( (port = strchr( proto_port, ':' )) != NULL ) {
706                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
707                         port++;
708                 } else {
709                         *(port++) = 0;                  // term proto string and point at port string
710                         proto = proto_port;             // user supplied proto so point at it rather than default
711                 }
712         } else {
713                 port = proto_port;                      // assume something like "1234" was passed
714         }
715         rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
716
717         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
718                 static_rtc = 1;
719         }
720
721         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
722                 tok = strdup( tok );                                    // something we can destroy
723                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
724                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
725                 } else {
726                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
727                 }
728                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
729                         *tok2 = 0;                                                      // make it so
730                 }
731
732                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
733                 free( tok );
734         } else {
735                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
736                         return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
737                 }
738                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
739                         *tok = 0;                                                                       // we don't keep domain portion
740                 }
741         }
742
743         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
744         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
745                 return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
746         }
747
748         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
749                 if( atoi( tok ) > 0 ) {
750                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
751                 }
752         }
753
754         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
755         if( flags & RMRFL_NAME_ONLY ) {
756                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
757         } else {
758                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
759                 if( ctx->my_ip == NULL ) {
760                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
761                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
762                 }
763         }
764         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
765
766         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
767                 if( *tok == '1' ) {
768                         ctx->flags |= CFL_WARN;                                 // turn on some warnings (not all, just ones that shouldn't impact performance)
769                 }
770         }
771
772
773         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
774                 interface = "0.0.0.0";
775         }
776
777         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
778         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
779                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
780                 return init_err( NULL, ctx, proto_port, 0 );
781         }
782
783                                                                                                 // finish all flag setting before threads to keep helgrind quiet
784         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
785
786
787         // ---------------- setup for route table collector before invoking ----------------------------------
788         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
789         if( ctx->rtgate != NULL ) {
790                 pthread_mutex_init( ctx->rtgate, NULL );
791         }
792
793         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
794         if( ctx->ephash == NULL ) {
795                 return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
796         }
797
798         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
799         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
800                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
801         } else {
802                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
803
804                 if( static_rtc ) {
805                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
806                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
807                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
808                         }
809                 } else {
810                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
811                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
812                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
813                         }
814                 }
815         }
816
817         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
818                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
819         }
820
821         free( proto_port );
822         return (void *) ctx;
823 }
824
825 /*
826         Initialise the message routing environment. Flags are one of the UTAFL_
827         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
828         (tcp) to be used, then :port is all that is needed.
829
830         At the moment it seems that TCP really is the only viable protocol, but
831         we'll allow flexibility.
832
833         The return value is a void pointer which must be passed to most uta functions. On
834         error, a nil pointer is returned and errno should be set.
835
836         Flags:
837                 No user flags supported (needed) at the moment, but this provides for extension
838                 without drastically changing anything. The user should invoke with RMRFL_NONE to
839                 avoid any misbehavour as there are internal flags which are suported
840 */
841 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
842         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
843 }
844
845 /*
846         This sets the default trace length which will be added to any message buffers
847         allocated.  It can be set at any time, and if rmr_set_trace() is given a
848         trace len that is different than the default allcoated in a message, the message
849         will be resized.
850
851         Returns 0 on failure and 1 on success. If failure, then errno will be set.
852 */
853 extern int rmr_init_trace( void* vctx, int tr_len ) {
854         uta_ctx_t* ctx;
855
856         errno = 0;
857         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
858                 errno = EINVAL;
859                 return 0;
860         }
861
862         ctx->trace_data_len = tr_len;
863         return 1;
864 }
865
866 /*
867         Return true if routing table is initialised etc. and app can send/receive.
868 */
869 extern int rmr_ready( void* vctx ) {
870         uta_ctx_t *ctx;
871
872         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
873                 return FALSE;
874         }
875
876         return ctx->rtable_ready;
877 }
878
879 /*
880         This returns the message queue ring's filedescriptor which can be used for
881         calls to epoll.  The user shouild NOT read, write, or close the fd.
882
883         Returns the file descriptor or -1 on error.
884 */
885 extern int rmr_get_rcvfd( void* vctx ) {
886         uta_ctx_t* ctx;
887         int state;
888
889         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
890                 return -1;
891         }
892
893         return uta_ring_getpfd( ctx->mring );
894 }
895
896
897 /*
898         Clean up things.
899
900         There isn't an si_flush() per se, but we can pause, generate
901         a context switch, which should allow the last sent buffer to
902         flow. There isn't exactly an nng_term/close either, so there
903         isn't much we can do.
904 */
905 extern void rmr_close( void* vctx ) {
906         uta_ctx_t *ctx;
907
908         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
909                 return;
910         }
911
912         if( ctx->seed_rt_fname != NULL ) {
913                 free( ctx->seed_rt_fname );
914         }
915
916         ctx->shutdown = 1;
917
918         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
919
920         // FIX ME -- how to we turn off si; close all sessions etc?
921         //SIclose( ctx->nn_sock );
922
923 }
924
925
926 // ----- multi-threaded call/receive support -------------------------------------------------
927
928 /*
929         Blocks on the receive ring chute semaphore and then reads from the ring
930         when it is tickled.  If max_wait is -1 then the function blocks until
931         a message is ready on the ring. Else max_wait is assumed to be the number
932         of millaseconds to wait before returning a timeout message.
933 */
934 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
935         uta_ctx_t*      ctx;
936         chute_t*        chute;
937         struct timespec ts;                     // time info if we have a timeout
938         long    new_ms;                         // adjusted mu-sec
939         long    seconds = 0;            // max wait seconds
940         long    nano_sec;                       // max wait xlated to nano seconds
941         int             state;
942         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
943
944         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
945                 errno = EINVAL;
946                 if( mbuf ) {
947                         mbuf->state = RMR_ERR_BADARG;
948                         mbuf->tp_state = errno;
949                 }
950                 return mbuf;
951         }
952
953         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
954
955         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
956
957         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
958                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
959                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
960                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
961                         if( ombuf ) {
962                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
963                         }
964                 } else {
965                         mbuf = ombuf;                                           // return original if it was given with timeout status
966                         if( ombuf != NULL ) {
967                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
968                                 mbuf->len = 0;
969                         }
970                 }
971
972                 if( mbuf != NULL ) {
973                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
974                 }
975
976                 return mbuf;
977         }
978
979         if( ombuf ) {
980                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
981                 ombuf->len = 0;
982         }
983         if( max_wait > 0 ) {
984                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
985
986                 if( max_wait > 999 ) {
987                         seconds = max_wait / 1000;
988                         max_wait -= seconds * 1000;
989                         ts.tv_sec += seconds;
990                 }
991                 if( max_wait > 0 ) {
992                         nano_sec = max_wait * 1000000;
993                         ts.tv_nsec += nano_sec;
994                         if( ts.tv_nsec > 999999999 ) {
995                                 ts.tv_nsec -= 999999999;
996                                 ts.tv_sec++;
997                         }
998                 }
999
1000                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
1001         }
1002
1003         errno = EINTR;
1004         state = -1;
1005         while( state < 0 && errno == EINTR ) {
1006                 if( seconds ) {
1007                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1008                 } else {
1009                         state = sem_wait( &chute->barrier );
1010                 }
1011         }
1012
1013         if( state < 0 ) {
1014                 mbuf = ombuf;                           // return caller's buffer if they passed one in
1015         } else {
1016                 errno = 0;                                              // interrupted call state could be left; clear
1017                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
1018                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
1019                         mbuf->state = RMR_OK;
1020                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
1021
1022                         if( ombuf ) {
1023                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
1024                         }
1025                 } else {
1026                         errno = ETIMEDOUT;
1027                         mbuf = ombuf;                           // no buffer, return user's if there
1028                 }
1029         }
1030
1031         if( mbuf ) {
1032                 mbuf->tp_state = errno;
1033         }
1034         return mbuf;
1035 }
1036
1037
1038
1039
1040 /*
1041         This is the work horse for the multi-threaded call() function. It supports
1042         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1043         for for rmr_mt_call() modulo the caveat below.
1044
1045         If endpoint is given, then we assume that we're not doing normal route table
1046         routing and that we should send directly to that endpoint (probably worm
1047         hole).
1048 */
1049 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1050         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1051         uta_ctx_t*      ctx;
1052         uta_mhdr_t*     hdr;                    // header in the transport buffer
1053         chute_t*        chute;
1054         unsigned char*  d1;                     // d1 data in header
1055         struct timespec ts;                     // time info if we have a timeout
1056         long    new_ms;                         // adjusted mu-sec
1057         long    seconds = 0;            // max wait seconds
1058         long    nano_sec;                       // max wait xlated to nano seconds
1059         int             state;
1060
1061         errno = EINVAL;
1062         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1063                 if( mbuf ) {
1064                         mbuf->tp_state = errno;
1065                         mbuf->state = RMR_ERR_BADARG;
1066                 }
1067                 return mbuf;
1068         }
1069
1070         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1071                 mbuf->state = RMR_ERR_NOTSUPP;
1072                 mbuf->tp_state = errno;
1073                 return mbuf;
1074         }
1075
1076         ombuf = mbuf;                                                                                                   // save to return timeout status with
1077
1078         chute = &ctx->chutes[call_id];
1079         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1080                 rmr_free_msg( chute->mbuf );
1081                 chute->mbuf = NULL;
1082         }
1083
1084         hdr = (uta_mhdr_t *) mbuf->header;
1085         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1086         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1087         d1 = DATA1_ADDR( hdr );
1088         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1089         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1090
1091         if( max_wait >= 0 ) {
1092                 clock_gettime( CLOCK_REALTIME, &ts );
1093
1094                 if( max_wait > 999 ) {
1095                         seconds = max_wait / 1000;
1096                         max_wait -= seconds * 1000;
1097                         ts.tv_sec += seconds;
1098                 }
1099                 if( max_wait > 0 ) {
1100                         nano_sec = max_wait * 1000000;
1101                         ts.tv_nsec += nano_sec;
1102                         if( ts.tv_nsec > 999999999 ) {
1103                                 ts.tv_nsec -= 999999999;
1104                                 ts.tv_sec++;
1105                         }
1106                 }
1107
1108                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1109         }
1110
1111         if( ep == NULL ) {                                                                              // normal routing
1112                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1113         } else {
1114                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1115         }
1116         if( mbuf ) {
1117                 if( mbuf->state != RMR_OK ) {
1118                         mbuf->tp_state = errno;
1119                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1120                 }
1121         }
1122
1123         state = 0;
1124         errno = 0;
1125         while( chute->mbuf == NULL && ! errno ) {
1126                 if( seconds ) {
1127                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1128                 } else {
1129                         state = sem_wait( &chute->barrier );
1130                 }
1131
1132                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1133                         errno = 0;
1134                 }
1135
1136                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1137                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1138                                 rmr_free_msg( chute->mbuf );
1139                                 chute->mbuf = NULL;
1140                                 errno = 0;
1141                         }
1142                 }
1143         }
1144
1145         if( state < 0 ) {
1146                 return NULL;                                    // leave errno as set by sem wait call
1147         }
1148
1149         mbuf = chute->mbuf;
1150         if( mbuf != NULL ) {
1151                 mbuf->state = RMR_OK;
1152         }
1153         chute->mbuf = NULL;
1154
1155         return mbuf;
1156 }
1157
1158 /*
1159         Accept a message buffer and caller ID, send the message and then wait
1160         for the receiver to tickle the semaphore letting us know that a message
1161         has been received. The call_id is a value between 2 and 255, inclusive; if
1162         it's not in this range an error will be returned. Max wait is the amount
1163         of time in millaseconds that the call should block for. If 0 is given
1164         then no timeout is set.
1165
1166         If the mt_call feature has not been initialised, then the attempt to use this
1167         funciton will fail with RMR_ERR_NOTSUPP
1168
1169         If no matching message is received before the max_wait period expires, a
1170         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1171         occurs after the message has been sent, then a nil pointer is returned
1172         with errno set to some other value.
1173
1174         This is now just an outward facing wrapper so we can support wormhole calls.
1175 */
1176 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1177
1178         // must vet call_id here, all others vetted by workhorse mt_call() function
1179         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1180                 if( mbuf != NULL ) {
1181                         mbuf->state = RMR_ERR_BADARG;
1182                         mbuf->tp_state = EINVAL;
1183                 }
1184                 return mbuf;
1185         }
1186
1187         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1188 }
1189
1190
1191 /*
1192         Given an existing message buffer, reallocate the payload portion to
1193         be at least new_len bytes.  The message header will remain such that
1194         the caller may use the rmr_rts_msg() function to return a payload
1195         to the sender.
1196
1197         The mbuf passed in may or may not be reallocated and the caller must
1198         use the returned pointer and should NOT assume that it can use the
1199         pointer passed in with the exceptions based on the clone flag.
1200
1201         If the clone flag is set, then a duplicated message, with larger payload
1202         size, is allocated and returned.  The old_msg pointer in this situation is
1203         still valid and must be explicitly freed by the application. If the clone
1204         message is not set (0), then any memory management of the old message is
1205         handled by the function.
1206
1207         If the copy flag is set, the contents of the old message's payload is
1208         copied to the reallocated payload.  If the flag is not set, then the
1209         contents of the payload is undetermined.
1210 */
1211 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1212         if( old_msg == NULL ) {
1213                 return NULL;
1214         }
1215
1216         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1217 }
1218
1219 /*
1220         Enable low latency things in the transport (when supported).
1221 */
1222 extern void rmr_set_low_latency( void* vctx ) {
1223         uta_ctx_t*      ctx;
1224
1225         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1226                 if( ctx->si_ctx != NULL ) {
1227                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1228                 }
1229         }
1230 }
1231
1232 /*
1233         Turn on fast acks.
1234 */
1235 extern void rmr_set_fack( void* vctx ) {
1236         uta_ctx_t*      ctx;
1237
1238         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1239                 if( ctx->si_ctx != NULL ) {
1240                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1241                 }
1242         }
1243 }
1244