Add alarm generation when application is slow
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "alarm.c"
71 #include "rtc_static.c"                         // route table collector (thread code)
72 #include "tools_static.c"
73 #include "sr_si_static.c"                       // send/receive static functions
74 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_si_static.c"
77
78
79 //------------------------------------------------------------------------------
80
81
82 /*
83         Clean up a context.
84 */
85 static void free_ctx( uta_ctx_t* ctx ) {
86         if( ctx ) {
87                 if( ctx->rtg_addr ){
88                         free( ctx->rtg_addr );
89                 }
90                 uta_ring_free( ctx->mring );
91                 uta_ring_free( ctx->zcb_mring );
92                 if( ctx->chutes ){
93                         free( ctx->chutes );
94                 }
95                 if( ctx->fd2ep ){
96                         rmr_sym_free( ctx->fd2ep );
97                 }
98                 if( ctx->my_name ){
99                         free( ctx->my_name );
100                 }
101                 if( ctx->my_ip ){
102                         free( ctx->my_ip );
103                 }
104                 if( ctx->rtable ){
105                         rmr_sym_free( ctx->rtable->hash );
106                         free( ctx->rtable );
107                 }
108                 if ( ctx->ephash ){
109                         free( ctx->ephash );
110                 }
111                 free( ctx );
112         }
113 }
114
115 // --------------- public functions --------------------------------------------------------------------------
116
117 /*
118         Returns the size of the payload (bytes) that the msg buffer references.
119         Len in a message is the number of bytes which were received, or should
120         be transmitted, however, it is possible that the mbuf was allocated
121         with a larger payload space than the payload length indicates; this
122         function returns the absolute maximum space that the user has available
123         in the payload. On error (bad msg buffer) -1 is returned and errno should
124         indicate the rason.
125
126         The allocated len stored in the msg is:
127                 transport header length +
128                 message header +
129                 user requested payload
130
131         The msg header is a combination of the fixed RMR header and the variable
132         trace data and d2 fields which may vary for each message.
133 */
134 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
135         if( msg == NULL || msg->header == NULL ) {
136                 errno = EINVAL;
137                 return -1;
138         }
139
140         errno = 0;
141         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
142 }
143
144 /*
145         Allocates a send message as a zerocopy message allowing the underlying message protocol
146         to send the buffer without copy.
147 */
148 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
149         uta_ctx_t*      ctx;
150         rmr_mbuf_t*     m;
151
152         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
153                 return NULL;
154         }
155
156         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
157         return  m;
158 }
159
160
161 /*
162         Allocates a send message as a zerocopy message allowing the underlying message protocol
163         to send the buffer without copy. In addition, a trace data field of tr_size will be
164         added and the supplied data coppied to the buffer before returning the message to
165         the caller.
166 */
167 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
168         uta_ctx_t*      ctx;
169         rmr_mbuf_t*     m;
170         int state;
171
172         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
173                 return NULL;
174         }
175
176         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
177         if( m != NULL ) {
178                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
179                 if( state != tr_size ) {
180                         m->state = RMR_ERR_INITFAILED;
181                 }
182         }
183
184         return  m;
185 }
186
187 /*
188         This provides an external path to the realloc static function as it's called by an
189         outward facing mbuf api function. Used to reallocate a message with a different
190         trace data size.
191
192         User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
193         is still valid following this call. The caller is reponsible for maintainting
194         a pointer to both old and new messages and invoking rmr_free_msg() on both!
195 */
196 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
197         return realloc_msg( msg, new_tr_size );
198 }
199
200
201 /*
202         Return the message to the available pool, or free it outright.
203 */
204 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
205         if( mbuf == NULL ) {
206                 fprintf( stderr, ">>>FREE  nil buffer\n" );
207                 return;
208         }
209
210 #ifdef KEEP
211         if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
212                 ! mbuf->ring ||                                                                         // cant cache if no ring
213                 ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
214
215                 if( mbuf->tp_buf ) {
216                         free( mbuf->tp_buf );
217                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
218                 }
219
220                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
221                 free( mbuf );
222         }
223 #else
224         // always free, never manage a pool
225         if( mbuf->tp_buf ) {
226                 free( mbuf->tp_buf );
227                 mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
228         }
229
230         mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
231         free( mbuf );
232 #endif
233 }
234
235 /*
236         This is a wrapper to the real timeout send. We must wrap it now to ensure that
237         the call flag and call-id are reset
238 */
239 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
240         char* d1;                                                                                                                       // point at the call-id in the header
241
242         if( msg != NULL ) {
243                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
244
245                 d1 = DATA1_ADDR( msg->header );
246                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
247         }
248
249         return mtosend_msg( vctx, msg, max_to );
250 }
251
252 /*
253         Send with default max timeout as is set in the context.
254         See rmr_mtosend_msg() for more details on the parameters.
255         See rmr_stimeout() for info on setting the default timeout.
256 */
257 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
258         char* d1;                                                                                                               // point at the call-id in the header
259
260         if( msg != NULL ) {
261                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
262
263                 d1 = DATA1_ADDR( msg->header );
264                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
265         }
266
267         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
268 }
269
270 /*
271         Return to sender allows a message to be sent back to the endpoint where it originated.
272
273         With SI95 it was thought that the return to sender would be along the same open conneciton
274         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
275         applications sending at high message rates, returning responses on the same connection
276         causes major strife. Thus the decision was made to use the same method as NNG and just
277         open a second connection for reverse path.
278
279         We will attempt to use the name in the received message to look up the endpoint. If
280         that failes, then we will write on the connection that the message arrived on as a
281         falback.
282
283         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
284         and on error it can be passed back to this function to retry the send if desired. On error,
285         errno will liklely have the failure reason set by the nng send processing.  The following
286         are possible values for the state in the message buffer:
287
288         Message states returned:
289                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
290                 RMR_ERR_NOHDR  - message did not have a header
291                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
292                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
293                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
294
295         A nil message as the return value is rare, and generally indicates some kind of horrible
296         failure. The value of errno might give a clue as to what is wrong.
297
298         CAUTION:
299                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
300                 The caller must check for this and handle it properly.
301 */
302 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
303         int                     nn_sock;                        // endpoint socket for send
304         uta_ctx_t*      ctx;
305         char*           hold_src;                       // we need the original source if send fails
306         char*           hold_ip;                        // also must hold original ip
307         int                     sock_ok = 0;            // true if we found a valid endpoint socket
308         endpoint_t*     ep = NULL;                      // end point to track counts
309
310         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
311                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
312                 if( msg != NULL ) {
313                         msg->state = RMR_ERR_BADARG;
314                         msg->tp_state = errno;
315                 }
316                 return msg;
317         }
318
319         errno = 0;                                                                                                              // at this point any bad state is in msg returned
320         if( msg->header == NULL ) {
321                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
322                 msg->state = RMR_ERR_NOHDR;
323                 msg->tp_state = errno;
324                 return msg;
325         }
326
327         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
328
329         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
330         if( ! sock_ok ) {
331                 if( (nn_sock = msg->rts_fd) < 0 ) {
332                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
333                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
334                         }
335                         if( ! sock_ok ) {
336                                 msg->state = RMR_ERR_NOENDPT;
337                                 return msg;
338                         }
339                 }
340         }
341
342
343         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
344         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
345         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
346         zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
347         msg = send_msg( ctx, msg, nn_sock, -1 );
348         if( msg ) {
349                 if( ep != NULL ) {
350                         switch( msg->state ) {
351                                 case RMR_OK:
352                                         ep->scounts[EPSC_GOOD]++;
353                                         break;
354
355                                 case RMR_ERR_RETRY:
356                                         ep->scounts[EPSC_TRANS]++;
357                                         break;
358
359                                 default:
360                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
361                                         ep->scounts[EPSC_FAIL]++;
362                                         break;
363                         }
364                 }
365                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
366                 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
367                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
368         }
369
370         free( hold_src );
371         free( hold_ip );
372         return msg;
373 }
374
375 /*
376         If multi-threading call is turned on, this invokes that mechanism with the special call
377         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
378         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
379         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
380         a flexible timeout.
381
382         On timeout this function will return a nil pointer. If the original message could not
383         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
384
385         Original behavour:
386         Call sends the message based on message routing using the message type, and waits for a
387         response message to arrive with the same transaction id that was in the outgoing message.
388         If, while wiating for the expected response,  messages are received which do not have the
389         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
390         order that they were received.
391
392         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
393         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
394         may be resent (likely the context pointer was nil).  If the message is sent, but no
395         response is received, a nil message is returned with errno set to indicate the likley
396         issue:
397                 ETIMEDOUT -- too many messages were queued before reciving the expected response
398                 ENOBUFS -- the queued message ring is full, messages were dropped
399                 EINVAL  -- A parameter was not valid
400                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
401                                         user should call this function with the message again.
402
403 */
404 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
405         uta_ctx_t*              ctx;
406
407         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
408                 if( msg != NULL ) {
409                         msg->state = RMR_ERR_BADARG;
410                 }
411                 return msg;
412         }
413
414     return mt_call( vctx, msg, 1, 1000, NULL );         // use the reserved call-id of 1 and wait up to 1 sec
415 }
416
417 /*
418         The outward facing receive function. When invoked it will pop the oldest message
419         from the receive ring, if any are queued, and return it. If the ring is empty
420         then the receive function is invoked to wait for the next message to arrive (blocking).
421
422         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
423         nil, a new one will be allocated. However, the caller should NOT expect to get the same
424         struct back (if a queued message is returned the message struct will be different).
425 */
426 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
427         uta_ctx_t*      ctx;
428         rmr_mbuf_t*     qm;                             // message that was queued on the ring
429
430         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
431                 errno = EINVAL;
432                 if( old_msg != NULL ) {
433                         old_msg->state = RMR_ERR_BADARG;
434                         old_msg->tp_state = errno;
435                 }
436                 return old_msg;
437         }
438         errno = 0;
439
440         return rmr_mt_rcv( ctx, old_msg, -1 );
441 }
442
443 /*
444         This allows a timeout based receive for applications unable to implement epoll_wait()
445         (e.g. wrappers).
446 */
447 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
448         uta_ctx_t*      ctx;
449
450         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
451                 errno = EINVAL;
452                 if( old_msg != NULL ) {
453                         old_msg->state = RMR_ERR_BADARG;
454                         old_msg->tp_state = errno;
455                 }
456                 return old_msg;
457         }
458
459         return rmr_mt_rcv( ctx, old_msg, ms_to );
460 }
461
462 /*
463         DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
464                 too.  This function likely will not behave as expected in SI, and we are pretty sure it
465                 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
466
467         This blocks until the message with the 'expect' ID is received. Messages which are received
468         before the expected message are queued onto the message ring.  The function will return
469         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
470         expected message is received. If the queued message ring fills a nil pointer is returned
471         and errno is set to ENOBUFS.
472
473         Generally this will be invoked only by the call() function as it waits for a response, but
474         it is exposed to the user application as three is no reason not to.
475 */
476 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
477         uta_ctx_t*      ctx;
478         int     queued = 0;                             // number we pushed into the ring
479         int     exp_len = 0;                    // length of expected ID
480
481         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
482                 errno = EINVAL;
483                 if( msg != NULL ) {
484                         msg->state = RMR_ERR_BADARG;
485                         msg->tp_state = errno;
486                 }
487                 return msg;
488         }
489
490         errno = 0;
491
492         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
493                 return rmr_rcv_msg( ctx, msg );
494         }
495
496         exp_len = strlen( expect );
497         if( exp_len > RMR_MAX_XID ) {
498                 exp_len = RMR_MAX_XID;
499         }
500         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
501
502         while( queued < allow2queue ) {
503                 msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
504                 if( msg != NULL ) {
505                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
506                         if( msg->state == RMR_OK ) {
507                                 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
508                                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
509                                         return msg;
510                                 }
511
512                                 if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
513                                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
514                                         errno = ENOBUFS;
515                                         return NULL;
516                                 }
517
518                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
519                                 queued++;
520                                 msg = NULL;
521                         }
522                 }
523         }
524
525         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
526         errno = ETIMEDOUT;
527         return NULL;
528 }
529
530 /*
531         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
532         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
533         mechnism indicates eagain or etimeedout.  All other error conditions are reported
534         without this delay. Setting a timeout of 0 causes no retries to be attempted in
535         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
536         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
537         after every 1K send attempts until the "time" value is reached. Retries are abandoned
538         if NNG returns anything other than EAGAIN or EINTER is returned.
539
540         The default, if this function is not used, is 1; meaning that RMr will retry, but will
541         not enter a sleep.  In all cases the caller should check the status in the message returned
542         after a send call.
543
544         Returns -1 if the context was invalid; RMR_OK otherwise.
545 */
546 extern int rmr_set_stimeout( void* vctx, int time ) {
547         uta_ctx_t*      ctx;
548
549         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
550                 return -1;
551         }
552
553         if( time < 0 ) {
554                 time = 0;
555         }
556
557         ctx->send_retries = time;
558         return RMR_OK;
559 }
560
561 /*
562         Set receive timeout -- not supported in nng implementation
563
564         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
565 */
566 extern int rmr_set_rtimeout( void* vctx, int time ) {
567         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
568         return 0;
569 }
570
571
572 /*
573         This is the actual init workhorse. The user visible function meerly ensures that the
574         calling programme does NOT set any internal flags that are supported, and then
575         invokes this.  Internal functions (the route table collector) which need additional
576         open ports without starting additional route table collectors, will invoke this
577         directly with the proper flag.
578
579         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
580                                 that we know about. The _user_ should ensure that the supplied length also
581                                 includes the trace data length maximum as they are in control of that.
582 */
583 static void* init( char* uproto_port, int def_msg_size, int flags ) {
584         static  int announced = 0;
585         uta_ctx_t*      ctx = NULL;
586         char    bind_info[256];                         // bind info
587         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
588         char*   port;                                           // pointer into the proto_port buffer at the port value
589         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
590         char*   proto_port;
591         char    wbuf[1024];                                     // work buffer
592         char*   tok;                                            // pointer at token in a buffer
593         char*   tok2;
594         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
595         int             state;
596         int             i;
597         int             old_vlevel;
598
599         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
600
601         if( ! announced ) {
602                 rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
603                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
604                         uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
605                 announced = 1;
606
607                 rmr_set_vlevel( old_vlevel );           // return logging to the desired state
608                 uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
609         }
610
611         errno = 0;
612         if( uproto_port == NULL ) {
613                 proto_port = strdup( DEF_COMM_PORT );
614                 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
615         } else {
616                 proto_port = strdup( uproto_port );             // so we can modify it
617         }
618
619         if ( proto_port == NULL ){
620                 errno = ENOMEM;
621                 return NULL;
622         }
623
624         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
625                 errno = ENOMEM;
626                 goto err;
627         }
628         memset( ctx, 0, sizeof( uta_ctx_t ) );
629
630         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
631         ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
632         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
633         ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
634         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
635         for( i = 0; i < ctx->nrivers; i++ ) {
636                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
637         }
638
639         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
640         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
641         ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
642         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
643
644         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
645         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
646
647         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
648                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
649                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
650                 uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
651         } else {
652                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
653         }
654         init_mtcall( ctx );                                                             // set up call chutes
655         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
656
657
658         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
659         if( def_msg_size > 0 ) {
660                 ctx->max_plen = def_msg_size;
661         }
662
663         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
664         if( ctx->si_ctx == NULL ) {
665                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
666                 goto err;
667         }
668
669         if( (port = strchr( proto_port, ':' )) != NULL ) {
670                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
671                         port++;
672                 } else {
673                         *(port++) = 0;                  // term proto string and point at port string
674                         proto = proto_port;             // user supplied proto so point at it rather than default
675                 }
676         } else {
677                 port = proto_port;                      // assume something like "1234" was passed
678         }
679         rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
680
681         if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
682                 static_rtc = 1;
683         }
684
685         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
686                 tok = strdup( tok );                                    // something we can destroy
687                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
688                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
689                 } else {
690                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
691                 }
692                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
693                         *tok2 = 0;                                                      // make it so
694                 }
695
696                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
697                 free( tok );
698         } else {
699                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
700                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
701                         goto err;
702                 }
703                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
704                         *tok = 0;                                                                       // we don't keep domain portion
705                 }
706         }
707
708         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
709         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
710                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
711                 errno = EINVAL;
712                 goto err;
713         }
714
715         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
716                 if( atoi( tok ) > 0 ) {
717                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
718                 }
719         }
720
721         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
722         if( flags & RMRFL_NAME_ONLY ) {
723                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
724         } else {
725                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
726                 if( ctx->my_ip == NULL ) {
727                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
728                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
729                 }
730         }
731         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
732
733         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
734                 if( *tok == '1' ) {
735                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
736                 }
737         }
738
739
740         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
741                 interface = "0.0.0.0";
742         }
743
744         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
745         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
746                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
747                 goto err;
748         }
749
750                                                                                                 // finish all flag setting before threads to keep helgrind quiet
751         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
752
753
754         // ---------------- setup for route table collector before invoking ----------------------------------
755         ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
756         if( ctx->rtgate != NULL ) {
757                 pthread_mutex_init( ctx->rtgate, NULL );
758         }
759
760         ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
761         if( ctx->ephash == NULL ) {
762                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
763                 errno = ENOMEM;
764                 goto err;
765         }
766
767         ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
768         if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
769                 ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
770         } else {
771                 ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
772
773                 if( static_rtc ) {
774                         rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
775                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
776                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
777                         }
778                 } else {
779                         rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
780                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
781                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
782                         }
783                 }
784         }
785
786         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
787                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
788         }
789
790         free( proto_port );
791         return (void *) ctx;
792
793 err:
794         free( proto_port );
795         free_ctx( ctx );
796         return NULL;
797 }
798
799 /*
800         Initialise the message routing environment. Flags are one of the UTAFL_
801         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
802         (tcp) to be used, then :port is all that is needed.
803
804         At the moment it seems that TCP really is the only viable protocol, but
805         we'll allow flexibility.
806
807         The return value is a void pointer which must be passed to most uta functions. On
808         error, a nil pointer is returned and errno should be set.
809
810         Flags:
811                 No user flags supported (needed) at the moment, but this provides for extension
812                 without drastically changing anything. The user should invoke with RMRFL_NONE to
813                 avoid any misbehavour as there are internal flags which are suported
814 */
815 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
816         return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
817 }
818
819 /*
820         This sets the default trace length which will be added to any message buffers
821         allocated.  It can be set at any time, and if rmr_set_trace() is given a
822         trace len that is different than the default allcoated in a message, the message
823         will be resized.
824
825         Returns 0 on failure and 1 on success. If failure, then errno will be set.
826 */
827 extern int rmr_init_trace( void* vctx, int tr_len ) {
828         uta_ctx_t* ctx;
829
830         errno = 0;
831         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
832                 errno = EINVAL;
833                 return 0;
834         }
835
836         ctx->trace_data_len = tr_len;
837         return 1;
838 }
839
840 /*
841         Return true if routing table is initialised etc. and app can send/receive.
842 */
843 extern int rmr_ready( void* vctx ) {
844         uta_ctx_t *ctx;
845
846         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
847                 return FALSE;
848         }
849
850         return ctx->rtable_ready;
851 }
852
853 /*
854         This returns the message queue ring's filedescriptor which can be used for
855         calls to epoll.  The user shouild NOT read, write, or close the fd.
856
857         Returns the file descriptor or -1 on error.
858 */
859 extern int rmr_get_rcvfd( void* vctx ) {
860         uta_ctx_t* ctx;
861         int state;
862
863         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
864                 return -1;
865         }
866
867         return uta_ring_getpfd( ctx->mring );
868 }
869
870
871 /*
872         Clean up things.
873
874         There isn't an si_flush() per se, but we can pause, generate
875         a context switch, which should allow the last sent buffer to
876         flow. There isn't exactly an nng_term/close either, so there
877         isn't much we can do.
878 */
879 extern void rmr_close( void* vctx ) {
880         uta_ctx_t *ctx;
881
882         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
883                 return;
884         }
885
886         ctx->shutdown = 1;
887
888         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
889
890         // FIX ME -- how to we turn off si; close all sessions etc?
891         //SIclose( ctx->nn_sock );
892
893 }
894
895
896 // ----- multi-threaded call/receive support -------------------------------------------------
897
898 /*
899         Blocks on the receive ring chute semaphore and then reads from the ring
900         when it is tickled.  If max_wait is -1 then the function blocks until
901         a message is ready on the ring. Else max_wait is assumed to be the number
902         of millaseconds to wait before returning a timeout message.
903 */
904 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
905         uta_ctx_t*      ctx;
906         chute_t*        chute;
907         struct timespec ts;                     // time info if we have a timeout
908         long    new_ms;                         // adjusted mu-sec
909         long    seconds = 0;            // max wait seconds
910         long    nano_sec;                       // max wait xlated to nano seconds
911         int             state;
912         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
913
914         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
915                 errno = EINVAL;
916                 if( mbuf ) {
917                         mbuf->state = RMR_ERR_BADARG;
918                         mbuf->tp_state = errno;
919                 }
920                 return mbuf;
921         }
922
923         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
924
925         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
926
927         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
928                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
929                         clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
930                         sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
931                         if( ombuf ) {
932                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
933                         }
934                 } else {
935                         mbuf = ombuf;                                           // return original if it was given with timeout status
936                         if( ombuf != NULL ) {
937                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
938                                 mbuf->len = 0;
939                         }
940                 }
941
942                 if( mbuf != NULL ) {
943                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
944                 }
945
946                 return mbuf;
947         }
948
949         if( ombuf ) {
950                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
951                 ombuf->len = 0;
952         }
953         if( max_wait > 0 ) {
954                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
955
956                 if( max_wait > 999 ) {
957                         seconds = max_wait / 1000;
958                         max_wait -= seconds * 1000;
959                         ts.tv_sec += seconds;
960                 }
961                 if( max_wait > 0 ) {
962                         nano_sec = max_wait * 1000000;
963                         ts.tv_nsec += nano_sec;
964                         if( ts.tv_nsec > 999999999 ) {
965                                 ts.tv_nsec -= 999999999;
966                                 ts.tv_sec++;
967                         }
968                 }
969
970                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
971         }
972
973         errno = EINTR;
974         state = -1;
975         while( state < 0 && errno == EINTR ) {
976                 if( seconds ) {
977                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
978                 } else {
979                         state = sem_wait( &chute->barrier );
980                 }
981         }
982
983         if( state < 0 ) {
984                 mbuf = ombuf;                           // return caller's buffer if they passed one in
985         } else {
986                 errno = 0;                                              // interrupted call state could be left; clear
987                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
988                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
989                         mbuf->state = RMR_OK;
990                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
991
992                         if( ombuf ) {
993                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
994                         }
995                 } else {
996                         errno = ETIMEDOUT;
997                         mbuf = ombuf;                           // no buffer, return user's if there
998                 }
999         }
1000
1001         if( mbuf ) {
1002                 mbuf->tp_state = errno;
1003         }
1004         return mbuf;
1005 }
1006
1007
1008
1009
1010 /*
1011         This is the work horse for the multi-threaded call() function. It supports
1012         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1013         for for rmr_mt_call() modulo the caveat below.
1014
1015         If endpoint is given, then we assume that we're not doing normal route table
1016         routing and that we should send directly to that endpoint (probably worm
1017         hole).
1018 */
1019 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1020         rmr_mbuf_t* ombuf;                      // original mbuf passed in
1021         uta_ctx_t*      ctx;
1022         uta_mhdr_t*     hdr;                    // header in the transport buffer
1023         chute_t*        chute;
1024         unsigned char*  d1;                     // d1 data in header
1025         struct timespec ts;                     // time info if we have a timeout
1026         long    new_ms;                         // adjusted mu-sec
1027         long    seconds = 0;            // max wait seconds
1028         long    nano_sec;                       // max wait xlated to nano seconds
1029         int             state;
1030
1031         errno = EINVAL;
1032         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1033                 if( mbuf ) {
1034                         mbuf->tp_state = errno;
1035                         mbuf->state = RMR_ERR_BADARG;
1036                 }
1037                 return mbuf;
1038         }
1039
1040         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1041                 mbuf->state = RMR_ERR_NOTSUPP;
1042                 mbuf->tp_state = errno;
1043                 return mbuf;
1044         }
1045
1046         ombuf = mbuf;                                                                                                   // save to return timeout status with
1047
1048         chute = &ctx->chutes[call_id];
1049         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
1050                 rmr_free_msg( chute->mbuf );
1051                 chute->mbuf = NULL;
1052         }
1053
1054         hdr = (uta_mhdr_t *) mbuf->header;
1055         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
1056         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
1057         d1 = DATA1_ADDR( hdr );
1058         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1059         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1060
1061         if( max_wait >= 0 ) {
1062                 clock_gettime( CLOCK_REALTIME, &ts );
1063
1064                 if( max_wait > 999 ) {
1065                         seconds = max_wait / 1000;
1066                         max_wait -= seconds * 1000;
1067                         ts.tv_sec += seconds;
1068                 }
1069                 if( max_wait > 0 ) {
1070                         nano_sec = max_wait * 1000000;
1071                         ts.tv_nsec += nano_sec;
1072                         if( ts.tv_nsec > 999999999 ) {
1073                                 ts.tv_nsec -= 999999999;
1074                                 ts.tv_sec++;
1075                         }
1076                 }
1077
1078                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1079         }
1080
1081         if( ep == NULL ) {                                                                              // normal routing
1082                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1083         } else {
1084                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1085         }
1086         if( mbuf ) {
1087                 if( mbuf->state != RMR_OK ) {
1088                         mbuf->tp_state = errno;
1089                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1090                 }
1091         }
1092
1093         state = 0;
1094         errno = 0;
1095         while( chute->mbuf == NULL && ! errno ) {
1096                 if( seconds ) {
1097                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1098                 } else {
1099                         state = sem_wait( &chute->barrier );
1100                 }
1101
1102                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1103                         errno = 0;
1104                 }
1105
1106                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1107                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1108                                 rmr_free_msg( chute->mbuf );
1109                                 chute->mbuf = NULL;
1110                                 errno = 0;
1111                         }
1112                 }
1113         }
1114
1115         if( state < 0 ) {
1116                 return NULL;                                    // leave errno as set by sem wait call
1117         }
1118
1119         mbuf = chute->mbuf;
1120         if( mbuf != NULL ) {
1121                 mbuf->state = RMR_OK;
1122         }
1123         chute->mbuf = NULL;
1124
1125         return mbuf;
1126 }
1127
1128 /*
1129         Accept a message buffer and caller ID, send the message and then wait
1130         for the receiver to tickle the semaphore letting us know that a message
1131         has been received. The call_id is a value between 2 and 255, inclusive; if
1132         it's not in this range an error will be returned. Max wait is the amount
1133         of time in millaseconds that the call should block for. If 0 is given
1134         then no timeout is set.
1135
1136         If the mt_call feature has not been initialised, then the attempt to use this
1137         funciton will fail with RMR_ERR_NOTSUPP
1138
1139         If no matching message is received before the max_wait period expires, a
1140         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1141         occurs after the message has been sent, then a nil pointer is returned
1142         with errno set to some other value.
1143
1144         This is now just an outward facing wrapper so we can support wormhole calls.
1145 */
1146 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1147
1148         // must vet call_id here, all others vetted by workhorse mt_call() function
1149         if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
1150                 if( mbuf != NULL ) {
1151                         mbuf->state = RMR_ERR_BADARG;
1152                         mbuf->tp_state = EINVAL;
1153                 }
1154                 return mbuf;
1155         }
1156
1157         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1158 }
1159
1160
1161 /*
1162         Given an existing message buffer, reallocate the payload portion to
1163         be at least new_len bytes.  The message header will remain such that
1164         the caller may use the rmr_rts_msg() function to return a payload
1165         to the sender.
1166
1167         The mbuf passed in may or may not be reallocated and the caller must
1168         use the returned pointer and should NOT assume that it can use the
1169         pointer passed in with the exceptions based on the clone flag.
1170
1171         If the clone flag is set, then a duplicated message, with larger payload
1172         size, is allocated and returned.  The old_msg pointer in this situation is
1173         still valid and must be explicitly freed by the application. If the clone
1174         message is not set (0), then any memory management of the old message is
1175         handled by the function.
1176
1177         If the copy flag is set, the contents of the old message's payload is
1178         copied to the reallocated payload.  If the flag is not set, then the
1179         contents of the payload is undetermined.
1180 */
1181 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1182         if( old_msg == NULL ) {
1183                 return NULL;
1184         }
1185
1186         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1187 }
1188
1189 /*
1190         Enable low latency things in the transport (when supported).
1191 */
1192 extern void rmr_set_low_latency( void* vctx ) {
1193         uta_ctx_t*      ctx;
1194
1195         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1196                 if( ctx->si_ctx != NULL ) {
1197                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1198                 }
1199         }
1200 }
1201
1202 /*
1203         Turn on fast acks.
1204 */
1205 extern void rmr_set_fack( void* vctx ) {
1206         uta_ctx_t*      ctx;
1207
1208         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1209                 if( ctx->si_ctx != NULL ) {
1210                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1211                 }
1212         }
1213 }
1214