e50a8dfcf9e8ed35783c824df8b8117428822250
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2021 Nokia
5         Copyright (c) 2018-2021 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59
60 #define SI95_BUILD      1                       // we drop some common functions for si
61
62 #include "rmr.h"                                // things the users see
63 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
64 #include "rmr_si_private.h"             // things that we need too
65
66 #include "rmr_symtab.h"
67 #include "rmr_logging.h"
68
69 #include "ring_static.c"                        // message ring support
70 #include "rt_generic_static.c"          // route table things not transport specific
71 #include "rtable_si_static.c"           // route table things -- transport specific
72 #include "alarm.c"
73 #include "rtc_static.c"                         // route table collector (thread code)
74 #include "tools_static.c"
75 #include "sr_si_static.c"                       // send/receive static functions
76 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
77 #include "mt_call_static.c"
78 #include "mt_call_si_static.c"
79 #include "rmr_debug_si.c"           // debuging functions
80
81
82 //------------------------------------------------------------------------------
83
84 /*
85         If we have an EP, up the counters based on state.
86         This isn't needed, but it makes driving the code under unit test easier so we
87         induldge in the bean counter's desire for coverage numbers.
88 */
89 static inline void incr_ep_counts( int state, endpoint_t* ep ) {
90         if( ep != NULL ) {
91                 switch( state ) {
92                         case RMR_OK:
93                                 ep->scounts[EPSC_GOOD]++;
94                                 break;
95
96                         case RMR_ERR_RETRY:
97                                 ep->scounts[EPSC_TRANS]++;
98                                 break;
99
100                         default:
101                                 ep->scounts[EPSC_FAIL]++;
102                                 break;
103                 }
104         }
105 }
106
107 /*
108         Clean up a context.
109 */
110 static void free_ctx( uta_ctx_t* ctx ) {
111         if( ctx ) {
112                 if( ctx->rtg_addr ){
113                         free( ctx->rtg_addr );
114                 }
115                 uta_ring_free( ctx->mring );
116                 uta_ring_free( ctx->zcb_mring );
117                 if( ctx->chutes ){
118                         free( ctx->chutes );
119                 }
120                 if( ctx->fd2ep ){
121                         rmr_sym_free( ctx->fd2ep );
122                 }
123                 if( ctx->my_name ){
124                         free( ctx->my_name );
125                 }
126                 if( ctx->my_ip ){
127                         free( ctx->my_ip );
128                 }
129                 if( ctx->rtable ){
130                         rmr_sym_free( ctx->rtable->hash );
131                         free( ctx->rtable );
132                 }
133                 if ( ctx->ephash ){
134                         free( ctx->ephash );
135                 }
136                 free( ctx );
137         }
138 }
139
140 // --------------- public functions --------------------------------------------------------------------------
141
142 /*
143         Returns the size of the payload (bytes) that the msg buffer references.
144         Len in a message is the number of bytes which were received, or should
145         be transmitted, however, it is possible that the mbuf was allocated
146         with a larger payload space than the payload length indicates; this
147         function returns the absolute maximum space that the user has available
148         in the payload. On error (bad msg buffer) -1 is returned and errno should
149         indicate the rason.
150
151         The allocated len stored in the msg is:
152                 transport header length +
153                 message header +
154                 user requested payload
155
156         The msg header is a combination of the fixed RMR header and the variable
157         trace data and d2 fields which may vary for each message.
158 */
159 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
160         if( msg == NULL || msg->header == NULL ) {
161                 errno = EINVAL;
162                 return -1;
163         }
164
165         errno = 0;
166         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
167 }
168
169 /*
170         Allocates a send message as a zerocopy message allowing the underlying message protocol
171         to send the buffer without copy.
172 */
173 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
174         uta_ctx_t*      ctx;
175         rmr_mbuf_t*     m;
176
177         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
178                 return NULL;
179         }
180
181         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
182         return  m;
183 }
184
185
186 /*
187         Allocates a send message as a zerocopy message allowing the underlying message protocol
188         to send the buffer without copy. In addition, a trace data field of tr_size will be
189         added and the supplied data coppied to the buffer before returning the message to
190         the caller.
191 */
192 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
193         uta_ctx_t*      ctx;
194         rmr_mbuf_t*     m;
195         int state;
196
197         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
198                 return NULL;
199         }
200
201         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
202         if( m != NULL ) {
203                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
204                 if( state != tr_size ) {
205                         m->state = RMR_ERR_INITFAILED;
206                 }
207         }
208
209         return  m;
210 }
211
212 /*
213         This provides an external path to the realloc static function as it's called by an
214         outward facing mbuf api function. Used to reallocate a message with a different
215         trace data size.
216
217         User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
218         is still valid following this call. The caller is reponsible for maintainting
219         a pointer to both old and new messages and invoking rmr_free_msg() on both!
220 */
221 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
222         return realloc_msg( msg, new_tr_size );
223 }
224
225
226 /*
227         Return the message to the available pool, or free it outright.
228 */
229 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
230         if( mbuf == NULL ) {
231                 fprintf( stderr, ">>>FREE  nil buffer\n" );
232                 return;
233         }
234
235 #ifdef KEEP
236         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
237                 ! mbuf->ring ||                                                                         // cant cache if no ring
238                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
239
240                 if( mbuf->tp_buf ) {
241                         free( mbuf->tp_buf );
242                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
243                 }
244
245                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
246                 free( mbuf );
247         }
248 #else
249         // always free, never manage a pool
250         if( mbuf->tp_buf ) {
251                 free( mbuf->tp_buf );
252                 mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
253         }
254
255         mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
256         free( mbuf );
257 #endif
258 }
259
260 /*
261         This is a wrapper to the real timeout send. We must wrap it now to ensure that
262         the call flag and call-id are reset
263 */
264 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
265         char* d1;                                                                                                                       // point at the call-id in the header
266
267         if( msg != NULL ) {
268                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
269
270                 d1 = DATA1_ADDR( msg->header );
271                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
272         }
273
274         return mtosend_msg( vctx, msg, max_to );
275 }
276
277 /*
278         Send with default max timeout as is set in the context.
279         See rmr_mtosend_msg() for more details on the parameters.
280         See rmr_stimeout() for info on setting the default timeout.
281 */
282 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
283         char* d1;                                                                                                               // point at the call-id in the header
284
285         if( msg != NULL ) {
286                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
287
288                 d1 = DATA1_ADDR( msg->header );
289                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
290         }
291
292         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
293 }
294
295 /*
296         Return to sender allows a message to be sent back to the endpoint where it originated.
297
298         With SI95 it was thought that the return to sender would be along the same open conneciton
299         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
300         applications sending at high message rates, returning responses on the same connection
301         causes major strife. Thus the decision was made to use the same method as NNG and just
302         open a second connection for reverse path.
303
304         We will attempt to use the name in the received message to look up the endpoint. If
305         that failes, then we will write on the connection that the message arrived on as a
306         falback.
307
308         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
309         and on error it can be passed back to this function to retry the send if desired. On error,
310         errno will liklely have the failure reason set by the nng send processing.  The following
311         are possible values for the state in the message buffer:
312
313         Message states returned:
314                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
315                 RMR_ERR_NOHDR  - message did not have a header
316                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
317                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
318                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
319
320         A nil message as the return value is rare, and generally indicates some kind of horrible
321         failure. The value of errno might give a clue as to what is wrong.
322
323         CAUTION:
324                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
325                 The caller must check for this and handle it properly.
326 */
327 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
328         int                     nn_sock;                        // endpoint socket for send
329         uta_ctx_t*      ctx;
330         char*           hold_src;                       // we need the original source if send fails
331         char*           hold_ip;                        // also must hold original ip
332         int                     sock_ok = 0;            // true if we found a valid endpoint socket
333         endpoint_t*     ep = NULL;                      // end point to track counts
334
335         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
336                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
337                 if( msg != NULL ) {
338                         msg->state = RMR_ERR_BADARG;
339                         msg->tp_state = errno;
340                 }
341                 return msg;
342         }
343
344         errno = 0;                                                                                                              // at this point any bad state is in msg returned
345         if( msg->header == NULL ) {
346                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
347                 msg->state = RMR_ERR_NOHDR;
348                 msg->tp_state = errno;
349                 return msg;
350         }
351
352         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
353
354         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
355         if( ! sock_ok ) {
356                 if( (nn_sock = msg->rts_fd) < 0 ) {
357                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
358                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
359                         }
360                         if( ! sock_ok ) {
361                                 msg->state = RMR_ERR_NOENDPT;
362                                 return msg;
363                         }
364                 }
365         }
366
367         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
368         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
369         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
370         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
371         msg = send_msg( ctx, msg, nn_sock, -1 );
372         if( msg ) {
373                 incr_ep_counts(  msg->state, ep );                              // update counts
374
375                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
376                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
377                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
378         }
379
380         free( hold_src );
381         free( hold_ip );
382         return msg;
383 }
384
385 /*
386         If multi-threading call is turned on, this invokes that mechanism with the special call
387         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
388         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
389         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
390         a flexible timeout.
391
392         On timeout this function will return a nil pointer. If the original message could not
393         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
394
395         Original behavour:
396         Call sends the message based on message routing using the message type, and waits for a
397         response message to arrive with the same transaction id that was in the outgoing message.
398         If, while wiating for the expected response,  messages are received which do not have the
399         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
400         order that they were received.
401
402         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
403         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
404         may be resent (likely the context pointer was nil).  If the message is sent, but no
405         response is received, a nil message is returned with errno set to indicate the likley
406         issue:
407                 ETIMEDOUT -- too many messages were queued before reciving the expected response
408                 ENOBUFS -- the queued message ring is full, messages were dropped
409                 EINVAL  -- A parameter was not valid
410                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
411                                         user should call this function with the message again.
412
413 */
414 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
415         uta_ctx_t*              ctx;
416
417         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
418                 if( msg != NULL ) {
419                         msg->state = RMR_ERR_BADARG;
420                 }
421                 return msg;
422         }
423
424     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
425 }
426
427 /*
428         The outward facing receive function. When invoked it will pop the oldest message
429         from the receive ring, if any are queued, and return it. If the ring is empty
430         then the receive function is invoked to wait for the next message to arrive (blocking).
431
432         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
433         nil, a new one will be allocated. However, the caller should NOT expect to get the same
434         struct back (if a queued message is returned the message struct will be different).
435 */
436 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
437         uta_ctx_t*      ctx;
438         rmr_mbuf_t*     qm;                             // message that was queued on the ring
439
440         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
441                 errno = EINVAL;
442                 if( old_msg != NULL ) {
443                         old_msg->state = RMR_ERR_BADARG;
444                         old_msg->tp_state = errno;
445                 }
446                 return old_msg;
447         }
448         errno = 0;
449
450         return rmr_mt_rcv( ctx, old_msg, -1 );
451 }
452
453 /*
454         This allows a timeout based receive for applications unable to implement epoll_wait()
455         (e.g. wrappers).
456 */
457 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
458         uta_ctx_t*      ctx;
459
460         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
461                 errno = EINVAL;
462                 if( old_msg != NULL ) {
463                         old_msg->state = RMR_ERR_BADARG;
464                         old_msg->tp_state = errno;
465                 }
466                 return old_msg;
467         }
468
469         return rmr_mt_rcv( ctx, old_msg, ms_to );
470 }
471
472 /*
473         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
474                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
475                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
476
477         This blocks until the message with the 'expect' ID is received. Messages which are received
478         before the expected message are queued onto the message ring.  The function will return
479         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
480         expected message is received. If the queued message ring fills a nil pointer is returned
481         and errno is set to ENOBUFS.
482
483         Generally this will be invoked only by the call() function as it waits for a response, but
484         it is exposed to the user application as three is no reason not to.
485 */
486 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
487         uta_ctx_t*      ctx;
488         int     queued = 0;                             // number we pushed into the ring
489         int     exp_len = 0;                    // length of expected ID
490
491         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
492                 errno = EINVAL;
493                 if( msg != NULL ) {
494                         msg->state = RMR_ERR_BADARG;
495                         msg->tp_state = errno;
496                 }
497                 return msg;
498         }
499
500         errno = 0;
501
502         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
503                 return rmr_rcv_msg( ctx, msg );
504         }
505
506         exp_len = strlen( expect );
507         if( exp_len > RMR_MAX_XID ) {
508                 exp_len = RMR_MAX_XID;
509         }
510         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
511
512         while( queued < allow2queue ) {
513                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
514                 if( msg != NULL ) {
515                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
516                         if( msg->state == RMR_OK ) {
517                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
518                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
519                                         return msg;
520                                 }
521
522                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
523                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
524                                         errno = ENOBUFS;
525                                         return NULL;
526                                 }
527
528                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
529                                 queued++;
530                                 msg = NULL;
531                         }
532                 }
533         }
534
535         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
536         errno = ETIMEDOUT;
537         return NULL;
538 }
539
540 /*
541         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
542         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
543         mechnism indicates eagain or etimeedout.  All other error conditions are reported
544         without this delay. Setting a timeout of 0 causes no retries to be attempted in
545         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
546         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
547         after every 1K send attempts until the "time" value is reached. Retries are abandoned
548         if NNG returns anything other than EAGAIN or EINTER is returned.
549
550         The default, if this function is not used, is 1; meaning that RMr will retry, but will
551         not enter a sleep.  In all cases the caller should check the status in the message returned
552         after a send call.
553
554         Returns -1 if the context was invalid; RMR_OK otherwise.
555 */
556 extern int rmr_set_stimeout( void* vctx, int time ) {
557         uta_ctx_t*      ctx;
558
559         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
560                 return -1;
561         }
562
563         if( time < 0 ) {
564                 time = 0;
565         }
566
567         ctx->send_retries = time;
568         return RMR_OK;
569 }
570
571 /*
572         Set receive timeout -- not supported in nng implementation
573
574         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
575 */
576 extern int rmr_set_rtimeout( void* vctx, int time ) {
577         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
578         return 0;
579 }
580
581 /*
582         Common cleanup on initialisation error. These are hard to force, and this helps to ensure
583         all code is tested by providing a callable rather than a block of "goto" code.
584
585         There is a return value so that where we need this we get dinked only for one
586         uncovered line rather than two:
587                 init_err(...);
588                 return NULL;
589
590         That's a hack, and is yet another example of the testing tail wagging the dog.
591 */
592 static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
593         if( errval != 0 ) {                     // if not letting it be what a sysllib set it to...
594                 errno = errval;
595         }
596
597         if( port ) {                            // free things if allocated
598                 free( port );
599         }
600         if( ctx ) {
601                 free_ctx( ctx );
602         }
603
604         if( msg ) {                                                                     // crit message if supplied
605                 rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
606         }
607
608         return NULL;
609 }
610
611 /*
612         This is the actual init workhorse. The user visible function meerly ensures that the
613         calling programme does NOT set any internal flags that are supported, and then
614         invokes this.  Internal functions (the route table collector) which need additional
615         open ports without starting additional route table collectors, will invoke this
616         directly with the proper flag.
617
618         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
619                                 that we know about. The _user_ should ensure that the supplied length also
620                                 includes the trace data length maximum as they are in control of that.
621 */
622 static void* init( char* uproto_port, int def_msg_size, int flags ) {
623         static  int announced = 0;
624         uta_ctx_t*      ctx = NULL;
625         char    bind_info[256];                         // bind info
626         char*   my_ip = NULL;
627         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
628         char*   port;                                           // pointer into the proto_port buffer at the port value
629         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
630         char*   proto_port;
631         char    wbuf[1024];                                     // work buffer
632         char*   tok;                                            // pointer at token in a buffer
633         char*   tok2;
634         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
635         int             state;
636         int             i;
637         int             old_vlevel;
638
639         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
640
641         if( ! announced ) {
642                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
643                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
644                         uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
645                 announced = 1;
646
647                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
648                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
649         }
650
651         errno = 0;
652         if( uproto_port == NULL ) {
653                 proto_port = strdup( DEF_COMM_PORT );
654                 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
655         } else {
656                 proto_port = strdup( uproto_port );             // so we can modify it
657         }
658
659         if ( proto_port == NULL ){
660                 return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
661         }
662
663         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
664                 return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
665         }
666         memset( ctx, 0, sizeof( uta_ctx_t ) );
667
668         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
669         ctx->snarf_rt_fd = -1;
670         ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
671         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
672         ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
673         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
674         for( i = 0; i < ctx->nrivers; i++ ) {
675                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
676         }
677
678         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
679         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
680         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
681         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
682
683         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
684         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
685
686         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
687                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
688                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
689                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
690         } else {
691                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
692         }
693         init_mtcall( ctx );                                                             // set up call chutes
694         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
695
696
697         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
698         if( def_msg_size > 0 ) {
699                 ctx->max_plen = def_msg_size;
700         }
701
702         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
703         if( ctx->si_ctx == NULL ) {
704                 return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
705         }
706
707         if( (port = strchr( proto_port, ':' )) != NULL ) {
708                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
709                         port++;
710                 } else {
711                         *(port++) = 0;                  // term proto string and point at port string
712                         proto = proto_port;             // user supplied proto so point at it rather than default
713                 }
714         } else {
715                 port = proto_port;                      // assume something like "1234" was passed
716         }
717         rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
718
719         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
720                 static_rtc = 1;
721         }
722
723         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
724                 tok = strdup( tok );                                    // something we can destroy
725                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
726                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
727                 } else {
728                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
729                 }
730                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
731                         *tok2 = 0;                                                      // make it so
732                 }
733
734                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
735                 free( tok );
736         } else {
737                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
738                         return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
739                 }
740                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
741                         *tok = 0;                                                                       // we don't keep domain portion
742                 }
743         }
744
745         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
746         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
747                 return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
748         }
749
750         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
751                 if( atoi( tok ) > 0 ) {
752                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
753                 }
754         }
755
756         ctx->ip_list = mk_ip_list( port );                      // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
757         my_ip = get_default_ip( ctx->ip_list );         // and (guess) at what should be the default to put into messages as src
758         if( my_ip == NULL ) {
759                 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
760                 my_ip = strdup( ctx->my_name );                 // if we cannot suss it out, use the name rather than a nil pointer
761         }
762
763         if( flags & RMRFL_NAME_ONLY ) {
764                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
765         } else {
766                 ctx->my_ip = strdup( my_ip );
767         }
768         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
769
770         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
771                 if( *tok == '1' ) {
772                         ctx->flags |= CFL_WARN;                                 // turn on some warnings (not all, just ones that shouldn't impact performance)
773                 }
774         }
775
776         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all (IPv4, IPv6, or interface name)
777                 /*
778                         compares the first ip sussed out by mk_ip_list (returned by get_default_ip)
779                         NOTE: this might be not work very predictable in dual-stack where an interface can have IPv4 and IPv6 addresses assigned,
780                         meaning that it can select either IPv4 or IPv6 on applications restarts (depends on the order of IP addresses assigned on the interface)
781                 */
782                 if( my_ip[0] == '[' ) {                 // IPv6
783                         interface = "[::]";
784                 } else {                                                // IPv4
785                         interface = "0.0.0.0";
786                         if( flags & RMRFL_NAME_ONLY ) { // if hostname is given instead of IP in RMR source address
787                                 rmr_vlog( RMR_VL_WARN, "rmr_init: hostname:ip is provided as source information for rts() calls, falling back to any IPv4\n" );
788                         }
789                 }
790         }
791
792         if( my_ip != NULL ) {
793                 free( my_ip );
794         }
795
796         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
797         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
798                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
799                 return init_err( NULL, ctx, proto_port, 0 );
800         }
801
802                                                                                                 // finish all flag setting before threads to keep helgrind quiet
803         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
804
805
806         // ---------------- setup for route table collector before invoking ----------------------------------
807         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
808         if( ctx->rtgate != NULL ) {
809                 pthread_mutex_init( ctx->rtgate, NULL );
810         }
811
812         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
813         if( ctx->ephash == NULL ) {
814                 return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
815         }
816
817         pthread_mutex_destroy(ctx->rtgate);
818         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
819         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
820                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
821         } else {
822                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
823
824                 if( static_rtc ) {
825                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
826                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
827                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
828                         }
829                 } else {
830                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
831                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
832                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
833                         }
834                 }
835         }
836
837         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
838                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
839         }
840
841         free( proto_port );
842         return (void *) ctx;
843 }
844
845 /*
846         Initialise the message routing environment. Flags are one of the UTAFL_
847         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
848         (tcp) to be used, then :port is all that is needed.
849
850         At the moment it seems that TCP really is the only viable protocol, but
851         we'll allow flexibility.
852
853         The return value is a void pointer which must be passed to most uta functions. On
854         error, a nil pointer is returned and errno should be set.
855
856         Flags:
857                 No user flags supported (needed) at the moment, but this provides for extension
858                 without drastically changing anything. The user should invoke with RMRFL_NONE to
859                 avoid any misbehavour as there are internal flags which are suported
860 */
861 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
862         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
863 }
864
865 /*
866         This sets the default trace length which will be added to any message buffers
867         allocated.  It can be set at any time, and if rmr_set_trace() is given a
868         trace len that is different than the default allcoated in a message, the message
869         will be resized.
870
871         Returns 0 on failure and 1 on success. If failure, then errno will be set.
872 */
873 extern int rmr_init_trace( void* vctx, int tr_len ) {
874         uta_ctx_t* ctx;
875
876         errno = 0;
877         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
878                 errno = EINVAL;
879                 return 0;
880         }
881
882         ctx->trace_data_len = tr_len;
883         return 1;
884 }
885
886 /*
887         Return true if routing table is initialised etc. and app can send/receive.
888 */
889 extern int rmr_ready( void* vctx ) {
890         uta_ctx_t *ctx;
891
892         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
893                 return FALSE;
894         }
895
896         return ctx->rtable_ready;
897 }
898
899 /*
900         This returns the message queue ring's filedescriptor which can be used for
901         calls to epoll.  The user shouild NOT read, write, or close the fd.
902
903         Returns the file descriptor or -1 on error.
904 */
905 extern int rmr_get_rcvfd( void* vctx ) {
906         uta_ctx_t* ctx;
907         int state;
908
909         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
910                 return -1;
911         }
912
913         return uta_ring_getpfd( ctx->mring );
914 }
915
916
917 /*
918         Clean up things.
919
920         There isn't an si_flush() per se, but we can pause, generate
921         a context switch, which should allow the last sent buffer to
922         flow. There isn't exactly an nng_term/close either, so there
923         isn't much we can do.
924 */
925 extern void rmr_close( void* vctx ) {
926         uta_ctx_t *ctx;
927
928         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
929                 return;
930         }
931
932         if( ctx->seed_rt_fname != NULL ) {
933                 free( ctx->seed_rt_fname );
934         }
935
936         ctx->shutdown = 1;
937
938         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
939
940         // FIX ME -- how to we turn off si; close all sessions etc?
941         //SIclose( ctx->nn_sock );
942
943 }
944
945
946 // ----- multi-threaded call/receive support -------------------------------------------------
947
948 /*
949         Blocks on the receive ring chute semaphore and then reads from the ring
950         when it is tickled.  If max_wait is -1 then the function blocks until
951         a message is ready on the ring. Else max_wait is assumed to be the number
952         of millaseconds to wait before returning a timeout message.
953 */
954 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
955         uta_ctx_t*      ctx;
956         chute_t*        chute;
957         struct timespec ts;                     // time info if we have a timeout
958         long    new_ms;                         // adjusted mu-sec
959         long    seconds = 0;            // max wait seconds
960         long    nano_sec;                       // max wait xlated to nano seconds
961         int             state;
962         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
963
964         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
965                 errno = EINVAL;
966                 if( mbuf ) {
967                         mbuf->state = RMR_ERR_BADARG;
968                         mbuf->tp_state = errno;
969                 }
970                 return mbuf;
971         }
972
973         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
974
975         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
976
977         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
978                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
979                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
980                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
981                         if( ombuf ) {
982                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
983                         }
984                 } else {
985                         mbuf = ombuf;                                           // return original if it was given with timeout status
986                         if( ombuf != NULL ) {
987                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
988                                 mbuf->len = 0;
989                         }
990                 }
991
992                 if( mbuf != NULL ) {
993                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
994                 }
995
996                 return mbuf;
997         }
998
999         if( ombuf ) {
1000                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
1001                 ombuf->len = 0;
1002         }
1003         if( max_wait > 0 ) {
1004                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
1005
1006                 if( max_wait > 999 ) {
1007                         seconds = max_wait / 1000;
1008                         max_wait -= seconds * 1000;
1009                         ts.tv_sec += seconds;
1010                 }
1011                 if( max_wait > 0 ) {
1012                         nano_sec = max_wait * 1000000;
1013                         ts.tv_nsec += nano_sec;
1014                         if( ts.tv_nsec > 999999999 ) {
1015                                 ts.tv_nsec -= 999999999;
1016                                 ts.tv_sec++;
1017                         }
1018                 }
1019
1020                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
1021         }
1022
1023         errno = EINTR;
1024         state = -1;
1025         while( state < 0 && errno == EINTR ) {
1026                 if( seconds ) {
1027                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1028                 } else {
1029                         state = sem_wait( &chute->barrier );
1030                 }
1031         }
1032
1033         if( state < 0 ) {
1034                 mbuf = ombuf;                           // return caller's buffer if they passed one in
1035         } else {
1036                 errno = 0;                                              // interrupted call state could be left; clear
1037                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
1038                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
1039                         mbuf->state = RMR_OK;
1040                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
1041
1042                         if( ombuf ) {
1043                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
1044                         }
1045                 } else {
1046                         errno = ETIMEDOUT;
1047                         mbuf = ombuf;                           // no buffer, return user's if there
1048                 }
1049         }
1050
1051         if( mbuf ) {
1052                 mbuf->tp_state = errno;
1053         }
1054         return mbuf;
1055 }
1056
1057
1058
1059
1060 /*
1061         This is the work horse for the multi-threaded call() function. It supports
1062         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1063         for for rmr_mt_call() modulo the caveat below.
1064
1065         If endpoint is given, then we assume that we're not doing normal route table
1066         routing and that we should send directly to that endpoint (probably worm
1067         hole).
1068 */
1069 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1070         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1071         uta_ctx_t*      ctx;
1072         uta_mhdr_t*     hdr;                    // header in the transport buffer
1073         chute_t*        chute;
1074         unsigned char*  d1;                     // d1 data in header
1075         struct timespec ts;                     // time info if we have a timeout
1076         long    new_ms;                         // adjusted mu-sec
1077         long    seconds = 0;            // max wait seconds
1078         long    nano_sec;                       // max wait xlated to nano seconds
1079         int             state;
1080
1081         errno = EINVAL;
1082         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1083                 if( mbuf ) {
1084                         mbuf->tp_state = errno;
1085                         mbuf->state = RMR_ERR_BADARG;
1086                 }
1087                 return mbuf;
1088         }
1089
1090         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1091                 mbuf->state = RMR_ERR_NOTSUPP;
1092                 mbuf->tp_state = errno;
1093                 return mbuf;
1094         }
1095
1096         ombuf = mbuf;                                                                                                   // save to return timeout status with
1097
1098         chute = &ctx->chutes[call_id];
1099         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1100                 rmr_free_msg( chute->mbuf );
1101                 chute->mbuf = NULL;
1102         }
1103
1104         hdr = (uta_mhdr_t *) mbuf->header;
1105         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1106         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1107         d1 = DATA1_ADDR( hdr );
1108         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1109         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1110
1111         if( max_wait >= 0 ) {
1112                 clock_gettime( CLOCK_REALTIME, &ts );
1113
1114                 if( max_wait > 999 ) {
1115                         seconds = max_wait / 1000;
1116                         max_wait -= seconds * 1000;
1117                         ts.tv_sec += seconds;
1118                 }
1119                 if( max_wait > 0 ) {
1120                         nano_sec = max_wait * 1000000;
1121                         ts.tv_nsec += nano_sec;
1122                         if( ts.tv_nsec > 999999999 ) {
1123                                 ts.tv_nsec -= 999999999;
1124                                 ts.tv_sec++;
1125                         }
1126                 }
1127
1128                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1129         }
1130
1131         if( ep == NULL ) {                                                                              // normal routing
1132                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1133         } else {
1134                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1135         }
1136         if( mbuf ) {
1137                 if( mbuf->state != RMR_OK ) {
1138                         mbuf->tp_state = errno;
1139                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1140                 }
1141         }
1142
1143         state = 0;
1144         errno = 0;
1145         while( chute->mbuf == NULL && ! errno ) {
1146                 if( seconds ) {
1147                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1148                 } else {
1149                         state = sem_wait( &chute->barrier );
1150                 }
1151
1152                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1153                         errno = 0;
1154                 }
1155
1156                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1157                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1158                                 rmr_free_msg( chute->mbuf );
1159                                 chute->mbuf = NULL;
1160                                 errno = 0;
1161                         }
1162                 }
1163         }
1164
1165         if( state < 0 ) {
1166                 return NULL;                                    // leave errno as set by sem wait call
1167         }
1168
1169         mbuf = chute->mbuf;
1170         if( mbuf != NULL ) {
1171                 mbuf->state = RMR_OK;
1172         }
1173         chute->mbuf = NULL;
1174
1175         return mbuf;
1176 }
1177
1178 /*
1179         Accept a message buffer and caller ID, send the message and then wait
1180         for the receiver to tickle the semaphore letting us know that a message
1181         has been received. The call_id is a value between 2 and 255, inclusive; if
1182         it's not in this range an error will be returned. Max wait is the amount
1183         of time in millaseconds that the call should block for. If 0 is given
1184         then no timeout is set.
1185
1186         If the mt_call feature has not been initialised, then the attempt to use this
1187         funciton will fail with RMR_ERR_NOTSUPP
1188
1189         If no matching message is received before the max_wait period expires, a
1190         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1191         occurs after the message has been sent, then a nil pointer is returned
1192         with errno set to some other value.
1193
1194         This is now just an outward facing wrapper so we can support wormhole calls.
1195 */
1196 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1197
1198         // must vet call_id here, all others vetted by workhorse mt_call() function
1199         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1200                 if( mbuf != NULL ) {
1201                         mbuf->state = RMR_ERR_BADARG;
1202                         mbuf->tp_state = EINVAL;
1203                 }
1204                 return mbuf;
1205         }
1206
1207         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1208 }
1209
1210
1211 /*
1212         Given an existing message buffer, reallocate the payload portion to
1213         be at least new_len bytes.  The message header will remain such that
1214         the caller may use the rmr_rts_msg() function to return a payload
1215         to the sender.
1216
1217         The mbuf passed in may or may not be reallocated and the caller must
1218         use the returned pointer and should NOT assume that it can use the
1219         pointer passed in with the exceptions based on the clone flag.
1220
1221         If the clone flag is set, then a duplicated message, with larger payload
1222         size, is allocated and returned.  The old_msg pointer in this situation is
1223         still valid and must be explicitly freed by the application. If the clone
1224         message is not set (0), then any memory management of the old message is
1225         handled by the function.
1226
1227         If the copy flag is set, the contents of the old message's payload is
1228         copied to the reallocated payload.  If the flag is not set, then the
1229         contents of the payload is undetermined.
1230 */
1231 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1232         if( old_msg == NULL ) {
1233                 return NULL;
1234         }
1235
1236         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1237 }
1238
1239 /*
1240         Enable low latency things in the transport (when supported).
1241 */
1242 extern void rmr_set_low_latency( void* vctx ) {
1243         uta_ctx_t*      ctx;
1244
1245         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1246                 if( ctx->si_ctx != NULL ) {
1247                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1248                 }
1249         }
1250 }
1251
1252 /*
1253         Turn on fast acks.
1254 */
1255 extern void rmr_set_fack( void* vctx ) {
1256         uta_ctx_t*      ctx;
1257
1258         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1259                 if( ctx->si_ctx != NULL ) {
1260                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1261                 }
1262         }
1263 }
1264