Correct excessive TCP connection bug
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "rtc_static.c"                         // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c"                       // send/receive static functions
73 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
76
77
78 //------------------------------------------------------------------------------
79
80
81 /*
82         Clean up a context.
83 */
84 static void free_ctx( uta_ctx_t* ctx ) {
85         if( ctx ) {
86                 if( ctx->rtg_addr ){
87                         free( ctx->rtg_addr );
88                 }
89                 uta_ring_free( ctx->mring );
90                 uta_ring_free( ctx->zcb_mring );
91                 if( ctx->chutes ){
92                         free( ctx->chutes );
93                 }
94                 if( ctx->fd2ep ){
95                         rmr_sym_free( ctx->fd2ep );
96                 }
97                 if( ctx->my_name ){
98                         free( ctx->my_name );
99                 }
100                 if( ctx->my_ip ){
101                         free( ctx->my_ip );
102                 }
103                 if( ctx->rtable ){
104                         rmr_sym_free( ctx->rtable->hash );
105                         free( ctx->rtable );
106                 }
107                 if ( ctx->ephash ){
108                         free( ctx->ephash );
109                 }
110                 free( ctx );
111         }
112 }
113
114 // --------------- public functions --------------------------------------------------------------------------
115
116 /*
117         Returns the size of the payload (bytes) that the msg buffer references.
118         Len in a message is the number of bytes which were received, or should
119         be transmitted, however, it is possible that the mbuf was allocated
120         with a larger payload space than the payload length indicates; this
121         function returns the absolute maximum space that the user has available
122         in the payload. On error (bad msg buffer) -1 is returned and errno should
123         indicate the rason.
124
125         The allocated len stored in the msg is:
126                 transport header length +
127                 message header +
128                 user requested payload
129
130         The msg header is a combination of the fixed RMR header and the variable
131         trace data and d2 fields which may vary for each message.
132 */
133 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
134         if( msg == NULL || msg->header == NULL ) {
135                 errno = EINVAL;
136                 return -1;
137         }
138
139         errno = 0;
140         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
141 }
142
143 /*
144         Allocates a send message as a zerocopy message allowing the underlying message protocol
145         to send the buffer without copy.
146 */
147 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
148         uta_ctx_t*      ctx;
149         rmr_mbuf_t*     m;
150
151         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
152                 return NULL;
153         }
154
155         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
156         return  m;
157 }
158
159
160 /*
161         Allocates a send message as a zerocopy message allowing the underlying message protocol
162         to send the buffer without copy. In addition, a trace data field of tr_size will be
163         added and the supplied data coppied to the buffer before returning the message to
164         the caller.
165 */
166 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
167         uta_ctx_t*      ctx;
168         rmr_mbuf_t*     m;
169         int state;
170
171         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
172                 return NULL;
173         }
174
175         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
176         if( m != NULL ) {
177                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
178                 if( state != tr_size ) {
179                         m->state = RMR_ERR_INITFAILED;
180                 }
181         }
182
183         return  m;
184 }
185
186 /*
187         This provides an external path to the realloc static function as it's called by an
188         outward facing mbuf api function. Used to reallocate a message with a different
189         trace data size.
190
191         User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
192         is still valid following this call. The caller is reponsible for maintainting
193         a pointer to both old and new messages and invoking rmr_free_msg() on both!
194 */
195 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
196         return realloc_msg( msg, new_tr_size );
197 }
198
199
200 /*
201         Return the message to the available pool, or free it outright.
202 */
203 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
204         if( mbuf == NULL ) {
205                 fprintf( stderr, ">>>FREE  nil buffer\n" );
206                 return;
207         }
208
209 #ifdef KEEP
210         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
211                 ! mbuf->ring ||                                                                         // cant cache if no ring
212                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
213
214                 if( mbuf->tp_buf ) {
215                         free( mbuf->tp_buf );
216                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
217                 }
218
219                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
220                 free( mbuf );
221         }
222 #else
223         // always free, never manage a pool
224         if( mbuf->tp_buf ) {
225                 free( mbuf->tp_buf );
226                 mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
227         }
228
229         mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
230         free( mbuf );
231 #endif
232 }
233
234 /*
235         This is a wrapper to the real timeout send. We must wrap it now to ensure that
236         the call flag and call-id are reset
237 */
238 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
239         char* d1;                                                                                                                       // point at the call-id in the header
240
241         if( msg != NULL ) {
242                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
243
244                 d1 = DATA1_ADDR( msg->header );
245                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
246         }
247
248         return mtosend_msg( vctx, msg, max_to );
249 }
250
251 /*
252         Send with default max timeout as is set in the context.
253         See rmr_mtosend_msg() for more details on the parameters.
254         See rmr_stimeout() for info on setting the default timeout.
255 */
256 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
257         char* d1;                                                                                                               // point at the call-id in the header
258
259         if( msg != NULL ) {
260                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
261
262                 d1 = DATA1_ADDR( msg->header );
263                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
264         }
265
266         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
267 }
268
269 /*
270         Return to sender allows a message to be sent back to the endpoint where it originated.
271
272         With SI95 it was thought that the return to sender would be along the same open conneciton
273         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
274         applications sending at high message rates, returning responses on the same connection
275         causes major strife. Thus the decision was made to use the same method as NNG and just
276         open a second connection for reverse path.
277
278         We will attempt to use the name in the received message to look up the endpoint. If
279         that failes, then we will write on the connection that the message arrived on as a
280         falback.
281
282         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
283         and on error it can be passed back to this function to retry the send if desired. On error,
284         errno will liklely have the failure reason set by the nng send processing.  The following
285         are possible values for the state in the message buffer:
286
287         Message states returned:
288                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
289                 RMR_ERR_NOHDR  - message did not have a header
290                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
291                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
292                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
293
294         A nil message as the return value is rare, and generally indicates some kind of horrible
295         failure. The value of errno might give a clue as to what is wrong.
296
297         CAUTION:
298                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
299                 The caller must check for this and handle it properly.
300 */
301 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
302         int                     nn_sock;                        // endpoint socket for send
303         uta_ctx_t*      ctx;
304         char*           hold_src;                       // we need the original source if send fails
305         char*           hold_ip;                        // also must hold original ip
306         int                     sock_ok = 0;            // true if we found a valid endpoint socket
307         endpoint_t*     ep = NULL;                      // end point to track counts
308
309         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
310                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
311                 if( msg != NULL ) {
312                         msg->state = RMR_ERR_BADARG;
313                         msg->tp_state = errno;
314                 }
315                 return msg;
316         }
317
318         errno = 0;                                                                                                              // at this point any bad state is in msg returned
319         if( msg->header == NULL ) {
320                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
321                 msg->state = RMR_ERR_NOHDR;
322                 msg->tp_state = errno;
323                 return msg;
324         }
325
326         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
327
328         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
329         if( ! sock_ok ) {
330                 if( (nn_sock = msg->rts_fd) < 0 ) {
331                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
332                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
333                         }
334                         if( ! sock_ok ) {
335                                 msg->state = RMR_ERR_NOENDPT;
336                                 return msg;
337                         }
338                 }
339         }
340
341
342         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
343         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
344         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
345         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
346         msg = send_msg( ctx, msg, nn_sock, -1 );
347         if( msg ) {
348                 if( ep != NULL ) {
349                         switch( msg->state ) {
350                                 case RMR_OK:
351                                         ep->scounts[EPSC_GOOD]++;
352                                         break;
353
354                                 case RMR_ERR_RETRY:
355                                         ep->scounts[EPSC_TRANS]++;
356                                         break;
357
358                                 default:
359                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
360                                         ep->scounts[EPSC_FAIL]++;
361                                         break;
362                         }
363                 }
364                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
365                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
366                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
367         }
368
369         free( hold_src );
370         free( hold_ip );
371         return msg;
372 }
373
374 /*
375         If multi-threading call is turned on, this invokes that mechanism with the special call
376         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
377         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
378         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
379         a flexible timeout.
380
381         On timeout this function will return a nil pointer. If the original message could not
382         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
383
384         Original behavour:
385         Call sends the message based on message routing using the message type, and waits for a
386         response message to arrive with the same transaction id that was in the outgoing message.
387         If, while wiating for the expected response,  messages are received which do not have the
388         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
389         order that they were received.
390
391         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
392         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
393         may be resent (likely the context pointer was nil).  If the message is sent, but no
394         response is received, a nil message is returned with errno set to indicate the likley
395         issue:
396                 ETIMEDOUT -- too many messages were queued before reciving the expected response
397                 ENOBUFS -- the queued message ring is full, messages were dropped
398                 EINVAL  -- A parameter was not valid
399                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
400                                         user should call this function with the message again.
401
402 */
403 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
404         uta_ctx_t*              ctx;
405
406         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
407                 if( msg != NULL ) {
408                         msg->state = RMR_ERR_BADARG;
409                 }
410                 return msg;
411         }
412
413     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
414 }
415
416 /*
417         The outward facing receive function. When invoked it will pop the oldest message
418         from the receive ring, if any are queued, and return it. If the ring is empty
419         then the receive function is invoked to wait for the next message to arrive (blocking).
420
421         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
422         nil, a new one will be allocated. However, the caller should NOT expect to get the same
423         struct back (if a queued message is returned the message struct will be different).
424 */
425 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
426         uta_ctx_t*      ctx;
427         rmr_mbuf_t*     qm;                             // message that was queued on the ring
428
429         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
430                 errno = EINVAL;
431                 if( old_msg != NULL ) {
432                         old_msg->state = RMR_ERR_BADARG;
433                         old_msg->tp_state = errno;
434                 }
435                 return old_msg;
436         }
437         errno = 0;
438
439         return rmr_mt_rcv( ctx, old_msg, -1 );
440 }
441
442 /*
443         This allows a timeout based receive for applications unable to implement epoll_wait()
444         (e.g. wrappers).
445 */
446 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
447         uta_ctx_t*      ctx;
448
449         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
450                 errno = EINVAL;
451                 if( old_msg != NULL ) {
452                         old_msg->state = RMR_ERR_BADARG;
453                         old_msg->tp_state = errno;
454                 }
455                 return old_msg;
456         }
457
458         return rmr_mt_rcv( ctx, old_msg, ms_to );
459 }
460
461 /*
462         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
463                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
464                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
465
466         This blocks until the message with the 'expect' ID is received. Messages which are received
467         before the expected message are queued onto the message ring.  The function will return
468         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
469         expected message is received. If the queued message ring fills a nil pointer is returned
470         and errno is set to ENOBUFS.
471
472         Generally this will be invoked only by the call() function as it waits for a response, but
473         it is exposed to the user application as three is no reason not to.
474 */
475 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
476         uta_ctx_t*      ctx;
477         int     queued = 0;                             // number we pushed into the ring
478         int     exp_len = 0;                    // length of expected ID
479
480         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
481                 errno = EINVAL;
482                 if( msg != NULL ) {
483                         msg->state = RMR_ERR_BADARG;
484                         msg->tp_state = errno;
485                 }
486                 return msg;
487         }
488
489         errno = 0;
490
491         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
492                 return rmr_rcv_msg( ctx, msg );
493         }
494
495         exp_len = strlen( expect );
496         if( exp_len > RMR_MAX_XID ) {
497                 exp_len = RMR_MAX_XID;
498         }
499         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
500
501         while( queued < allow2queue ) {
502                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
503                 if( msg != NULL ) {
504                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
505                         if( msg->state == RMR_OK ) {
506                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
507                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
508                                         return msg;
509                                 }
510
511                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
512                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
513                                         errno = ENOBUFS;
514                                         return NULL;
515                                 }
516
517                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
518                                 queued++;
519                                 msg = NULL;
520                         }
521                 }
522         }
523
524         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
525         errno = ETIMEDOUT;
526         return NULL;
527 }
528
529 /*
530         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
531         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
532         mechnism indicates eagain or etimeedout.  All other error conditions are reported
533         without this delay. Setting a timeout of 0 causes no retries to be attempted in
534         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
535         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
536         after every 1K send attempts until the "time" value is reached. Retries are abandoned
537         if NNG returns anything other than EAGAIN or EINTER is returned.
538
539         The default, if this function is not used, is 1; meaning that RMr will retry, but will
540         not enter a sleep.  In all cases the caller should check the status in the message returned
541         after a send call.
542
543         Returns -1 if the context was invalid; RMR_OK otherwise.
544 */
545 extern int rmr_set_stimeout( void* vctx, int time ) {
546         uta_ctx_t*      ctx;
547
548         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
549                 return -1;
550         }
551
552         if( time < 0 ) {
553                 time = 0;
554         }
555
556         ctx->send_retries = time;
557         return RMR_OK;
558 }
559
560 /*
561         Set receive timeout -- not supported in nng implementation
562
563         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
564 */
565 extern int rmr_set_rtimeout( void* vctx, int time ) {
566         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
567         return 0;
568 }
569
570
571 /*
572         This is the actual init workhorse. The user visible function meerly ensures that the
573         calling programme does NOT set any internal flags that are supported, and then
574         invokes this.  Internal functions (the route table collector) which need additional
575         open ports without starting additional route table collectors, will invoke this
576         directly with the proper flag.
577
578         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
579                                 that we know about. The _user_ should ensure that the supplied length also
580                                 includes the trace data length maximum as they are in control of that.
581 */
582 static void* init( char* uproto_port, int def_msg_size, int flags ) {
583         static  int announced = 0;
584         uta_ctx_t*      ctx = NULL;
585         char    bind_info[256];                         // bind info
586         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
587         char*   port;                                           // pointer into the proto_port buffer at the port value
588         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
589         char*   proto_port;
590         char    wbuf[1024];                                     // work buffer
591         char*   tok;                                            // pointer at token in a buffer
592         char*   tok2;
593         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
594         int             state;
595         int             i;
596         int             old_vlevel;
597
598         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
599
600         if( ! announced ) {
601                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
602                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
603                         uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
604                 announced = 1;
605
606                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
607                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
608         }
609
610         errno = 0;
611         if( uproto_port == NULL ) {
612                 proto_port = strdup( DEF_COMM_PORT );
613                 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
614         } else {
615                 proto_port = strdup( uproto_port );             // so we can modify it
616         }
617
618         if ( proto_port == NULL ){
619                 errno = ENOMEM;
620                 return NULL;
621         }
622
623         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
624                 errno = ENOMEM;
625                 goto err;
626         }
627         memset( ctx, 0, sizeof( uta_ctx_t ) );
628
629         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
630         ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
631         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
632         ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
633         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
634         for( i = 0; i < ctx->nrivers; i++ ) {
635                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
636         }
637
638         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
639         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
640         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
641         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
642
643         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
644         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
645
646         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
647                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
648                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
649                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
650         } else {
651                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
652         }
653         init_mtcall( ctx );                                                             // set up call chutes
654         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
655
656
657         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
658         if( def_msg_size > 0 ) {
659                 ctx->max_plen = def_msg_size;
660         }
661
662         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
663         if( ctx->si_ctx == NULL ) {
664                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
665                 goto err;
666         }
667
668         if( (port = strchr( proto_port, ':' )) != NULL ) {
669                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
670                         port++;
671                 } else {
672                         *(port++) = 0;                  // term proto string and point at port string
673                         proto = proto_port;             // user supplied proto so point at it rather than default
674                 }
675         } else {
676                 port = proto_port;                      // assume something like "1234" was passed
677         }
678         rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
679
680         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
681                 static_rtc = 1;
682         }
683
684         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
685                 tok = strdup( tok );                                    // something we can destroy
686                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
687                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
688                 } else {
689                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
690                 }
691                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
692                         *tok2 = 0;                                                      // make it so
693                 }
694
695                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
696                 free( tok );
697         } else {
698                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
699                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
700                         goto err;
701                 }
702                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
703                         *tok = 0;                                                                       // we don't keep domain portion
704                 }
705         }
706
707         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
708         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
709                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
710                 errno = EINVAL;
711                 goto err;
712         }
713
714         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
715                 if( atoi( tok ) > 0 ) {
716                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
717                 }
718         }
719
720         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
721         if( flags & RMRFL_NAME_ONLY ) {
722                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
723         } else {
724                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
725                 if( ctx->my_ip == NULL ) {
726                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
727                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
728                 }
729         }
730         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
731
732         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
733                 if( *tok == '1' ) {
734                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
735                 }
736         }
737
738
739         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
740                 interface = "0.0.0.0";
741         }
742
743         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
744         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
745                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
746                 goto err;
747         }
748
749                                                                                                 // finish all flag setting before threads to keep helgrind quiet
750         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
751
752
753         // ---------------- setup for route table collector before invoking ----------------------------------
754         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
755         if( ctx->rtgate != NULL ) {
756                 pthread_mutex_init( ctx->rtgate, NULL );
757         }
758
759         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
760         if( ctx->ephash == NULL ) {
761                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
762                 errno = ENOMEM;
763                 goto err;
764         }
765
766         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
767         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
768                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
769         } else {
770                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
771
772                 if( static_rtc ) {
773                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
774                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
775                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
776                         }
777                 } else {
778                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
779                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
780                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
781                         }
782                 }
783         }
784
785         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
786                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
787         }
788
789         free( proto_port );
790         return (void *) ctx;
791
792 err:
793         free( proto_port );
794         free_ctx( ctx );
795         return NULL;
796 }
797
798 /*
799         Initialise the message routing environment. Flags are one of the UTAFL_
800         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
801         (tcp) to be used, then :port is all that is needed.
802
803         At the moment it seems that TCP really is the only viable protocol, but
804         we'll allow flexibility.
805
806         The return value is a void pointer which must be passed to most uta functions. On
807         error, a nil pointer is returned and errno should be set.
808
809         Flags:
810                 No user flags supported (needed) at the moment, but this provides for extension
811                 without drastically changing anything. The user should invoke with RMRFL_NONE to
812                 avoid any misbehavour as there are internal flags which are suported
813 */
814 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
815         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
816 }
817
818 /*
819         This sets the default trace length which will be added to any message buffers
820         allocated.  It can be set at any time, and if rmr_set_trace() is given a
821         trace len that is different than the default allcoated in a message, the message
822         will be resized.
823
824         Returns 0 on failure and 1 on success. If failure, then errno will be set.
825 */
826 extern int rmr_init_trace( void* vctx, int tr_len ) {
827         uta_ctx_t* ctx;
828
829         errno = 0;
830         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
831                 errno = EINVAL;
832                 return 0;
833         }
834
835         ctx->trace_data_len = tr_len;
836         return 1;
837 }
838
839 /*
840         Return true if routing table is initialised etc. and app can send/receive.
841 */
842 extern int rmr_ready( void* vctx ) {
843         uta_ctx_t *ctx;
844
845         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
846                 return FALSE;
847         }
848
849         return ctx->rtable_ready;
850 }
851
852 /*
853         This returns the message queue ring's filedescriptor which can be used for
854         calls to epoll.  The user shouild NOT read, write, or close the fd.
855
856         Returns the file descriptor or -1 on error.
857 */
858 extern int rmr_get_rcvfd( void* vctx ) {
859         uta_ctx_t* ctx;
860         int state;
861
862         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
863                 return -1;
864         }
865
866         return uta_ring_getpfd( ctx->mring );
867 }
868
869
870 /*
871         Clean up things.
872
873         There isn't an si_flush() per se, but we can pause, generate
874         a context switch, which should allow the last sent buffer to
875         flow. There isn't exactly an nng_term/close either, so there
876         isn't much we can do.
877 */
878 extern void rmr_close( void* vctx ) {
879         uta_ctx_t *ctx;
880
881         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
882                 return;
883         }
884
885         ctx->shutdown = 1;
886
887         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
888
889         // FIX ME -- how to we turn off si; close all sessions etc?
890         //SIclose( ctx->nn_sock );
891
892 }
893
894
895 // ----- multi-threaded call/receive support -------------------------------------------------
896
897 /*
898         Blocks on the receive ring chute semaphore and then reads from the ring
899         when it is tickled.  If max_wait is -1 then the function blocks until
900         a message is ready on the ring. Else max_wait is assumed to be the number
901         of millaseconds to wait before returning a timeout message.
902 */
903 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
904         uta_ctx_t*      ctx;
905         chute_t*        chute;
906         struct timespec ts;                     // time info if we have a timeout
907         long    new_ms;                         // adjusted mu-sec
908         long    seconds = 0;            // max wait seconds
909         long    nano_sec;                       // max wait xlated to nano seconds
910         int             state;
911         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
912
913         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
914                 errno = EINVAL;
915                 if( mbuf ) {
916                         mbuf->state = RMR_ERR_BADARG;
917                         mbuf->tp_state = errno;
918                 }
919                 return mbuf;
920         }
921
922         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
923
924         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
925
926         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
927                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
928                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
929                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
930                         if( ombuf ) {
931                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
932                         }
933                 } else {
934                         mbuf = ombuf;                                           // return original if it was given with timeout status
935                         if( ombuf != NULL ) {
936                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
937                                 mbuf->len = 0;
938                         }
939                 }
940
941                 if( mbuf != NULL ) {
942                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
943                 }
944
945                 return mbuf;
946         }
947
948         if( ombuf ) {
949                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
950                 ombuf->len = 0;
951         }
952         if( max_wait > 0 ) {
953                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
954
955                 if( max_wait > 999 ) {
956                         seconds = max_wait / 1000;
957                         max_wait -= seconds * 1000;
958                         ts.tv_sec += seconds;
959                 }
960                 if( max_wait > 0 ) {
961                         nano_sec = max_wait * 1000000;
962                         ts.tv_nsec += nano_sec;
963                         if( ts.tv_nsec > 999999999 ) {
964                                 ts.tv_nsec -= 999999999;
965                                 ts.tv_sec++;
966                         }
967                 }
968
969                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
970         }
971
972         errno = EINTR;
973         state = -1;
974         while( state < 0 && errno == EINTR ) {
975                 if( seconds ) {
976                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
977                 } else {
978                         state = sem_wait( &chute->barrier );
979                 }
980         }
981
982         if( state < 0 ) {
983                 mbuf = ombuf;                           // return caller's buffer if they passed one in
984         } else {
985                 errno = 0;                                              // interrupted call state could be left; clear
986                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
987                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
988                         mbuf->state = RMR_OK;
989                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
990
991                         if( ombuf ) {
992                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
993                         }
994                 } else {
995                         errno = ETIMEDOUT;
996                         mbuf = ombuf;                           // no buffer, return user's if there
997                 }
998         }
999
1000         if( mbuf ) {
1001                 mbuf->tp_state = errno;
1002         }
1003         return mbuf;
1004 }
1005
1006
1007
1008
1009 /*
1010         This is the work horse for the multi-threaded call() function. It supports
1011         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1012         for for rmr_mt_call() modulo the caveat below.
1013
1014         If endpoint is given, then we assume that we're not doing normal route table
1015         routing and that we should send directly to that endpoint (probably worm
1016         hole).
1017 */
1018 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1019         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1020         uta_ctx_t*      ctx;
1021         uta_mhdr_t*     hdr;                    // header in the transport buffer
1022         chute_t*        chute;
1023         unsigned char*  d1;                     // d1 data in header
1024         struct timespec ts;                     // time info if we have a timeout
1025         long    new_ms;                         // adjusted mu-sec
1026         long    seconds = 0;            // max wait seconds
1027         long    nano_sec;                       // max wait xlated to nano seconds
1028         int             state;
1029
1030         errno = EINVAL;
1031         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1032                 if( mbuf ) {
1033                         mbuf->tp_state = errno;
1034                         mbuf->state = RMR_ERR_BADARG;
1035                 }
1036                 return mbuf;
1037         }
1038
1039         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1040                 mbuf->state = RMR_ERR_NOTSUPP;
1041                 mbuf->tp_state = errno;
1042                 return mbuf;
1043         }
1044
1045         ombuf = mbuf;                                                                                                   // save to return timeout status with
1046
1047         chute = &ctx->chutes[call_id];
1048         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1049                 rmr_free_msg( chute->mbuf );
1050                 chute->mbuf = NULL;
1051         }
1052
1053         hdr = (uta_mhdr_t *) mbuf->header;
1054         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1055         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1056         d1 = DATA1_ADDR( hdr );
1057         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1058         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1059
1060         if( max_wait >= 0 ) {
1061                 clock_gettime( CLOCK_REALTIME, &ts );
1062
1063                 if( max_wait > 999 ) {
1064                         seconds = max_wait / 1000;
1065                         max_wait -= seconds * 1000;
1066                         ts.tv_sec += seconds;
1067                 }
1068                 if( max_wait > 0 ) {
1069                         nano_sec = max_wait * 1000000;
1070                         ts.tv_nsec += nano_sec;
1071                         if( ts.tv_nsec > 999999999 ) {
1072                                 ts.tv_nsec -= 999999999;
1073                                 ts.tv_sec++;
1074                         }
1075                 }
1076
1077                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1078         }
1079
1080         if( ep == NULL ) {                                                                              // normal routing
1081                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1082         } else {
1083                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1084         }
1085         if( mbuf ) {
1086                 if( mbuf->state != RMR_OK ) {
1087                         mbuf->tp_state = errno;
1088                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1089                 }
1090         }
1091
1092         state = 0;
1093         errno = 0;
1094         while( chute->mbuf == NULL && ! errno ) {
1095                 if( seconds ) {
1096                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1097                 } else {
1098                         state = sem_wait( &chute->barrier );
1099                 }
1100
1101                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1102                         errno = 0;
1103                 }
1104
1105                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1106                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1107                                 rmr_free_msg( chute->mbuf );
1108                                 chute->mbuf = NULL;
1109                                 errno = 0;
1110                         }
1111                 }
1112         }
1113
1114         if( state < 0 ) {
1115                 return NULL;                                    // leave errno as set by sem wait call
1116         }
1117
1118         mbuf = chute->mbuf;
1119         if( mbuf != NULL ) {
1120                 mbuf->state = RMR_OK;
1121         }
1122         chute->mbuf = NULL;
1123
1124         return mbuf;
1125 }
1126
1127 /*
1128         Accept a message buffer and caller ID, send the message and then wait
1129         for the receiver to tickle the semaphore letting us know that a message
1130         has been received. The call_id is a value between 2 and 255, inclusive; if
1131         it's not in this range an error will be returned. Max wait is the amount
1132         of time in millaseconds that the call should block for. If 0 is given
1133         then no timeout is set.
1134
1135         If the mt_call feature has not been initialised, then the attempt to use this
1136         funciton will fail with RMR_ERR_NOTSUPP
1137
1138         If no matching message is received before the max_wait period expires, a
1139         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1140         occurs after the message has been sent, then a nil pointer is returned
1141         with errno set to some other value.
1142
1143         This is now just an outward facing wrapper so we can support wormhole calls.
1144 */
1145 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1146
1147         // must vet call_id here, all others vetted by workhorse mt_call() function
1148         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1149                 if( mbuf != NULL ) {
1150                         mbuf->state = RMR_ERR_BADARG;
1151                         mbuf->tp_state = EINVAL;
1152                 }
1153                 return mbuf;
1154         }
1155
1156         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1157 }
1158
1159
1160 /*
1161         Given an existing message buffer, reallocate the payload portion to
1162         be at least new_len bytes.  The message header will remain such that
1163         the caller may use the rmr_rts_msg() function to return a payload
1164         to the sender.
1165
1166         The mbuf passed in may or may not be reallocated and the caller must
1167         use the returned pointer and should NOT assume that it can use the
1168         pointer passed in with the exceptions based on the clone flag.
1169
1170         If the clone flag is set, then a duplicated message, with larger payload
1171         size, is allocated and returned.  The old_msg pointer in this situation is
1172         still valid and must be explicitly freed by the application. If the clone
1173         message is not set (0), then any memory management of the old message is
1174         handled by the function.
1175
1176         If the copy flag is set, the contents of the old message's payload is
1177         copied to the reallocated payload.  If the flag is not set, then the
1178         contents of the payload is undetermined.
1179 */
1180 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1181         if( old_msg == NULL ) {
1182                 return NULL;
1183         }
1184
1185         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1186 }
1187
1188 /*
1189         Enable low latency things in the transport (when supported).
1190 */
1191 extern void rmr_set_low_latency( void* vctx ) {
1192         uta_ctx_t*      ctx;
1193
1194         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1195                 if( ctx->si_ctx != NULL ) {
1196                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1197                 }
1198         }
1199 }
1200
1201 /*
1202         Turn on fast acks.
1203 */
1204 extern void rmr_set_fack( void* vctx ) {
1205         uta_ctx_t*      ctx;
1206
1207         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1208                 if( ctx->si_ctx != NULL ) {
1209                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1210                 }
1211         }
1212 }
1213