0a4f80644f301430716e572f1c151d31395559c7
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "rtc_static.c"                         // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c"                       // send/receive static functions
73 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
76
77
78 //------------------------------------------------------------------------------
79
80
81 /*
82         Clean up a context.
83 */
84 static void free_ctx( uta_ctx_t* ctx ) {
85         if( ctx ) {
86                 if( ctx->rtg_addr ) {
87                         free( ctx->rtg_addr );
88                 }
89         }
90 }
91
92 // --------------- public functions --------------------------------------------------------------------------
93
94 /*
95         Returns the size of the payload (bytes) that the msg buffer references.
96         Len in a message is the number of bytes which were received, or should
97         be transmitted, however, it is possible that the mbuf was allocated
98         with a larger payload space than the payload length indicates; this
99         function returns the absolute maximum space that the user has available
100         in the payload. On error (bad msg buffer) -1 is returned and errno should
101         indicate the rason.
102
103         The allocated len stored in the msg is:
104                 transport header length +
105                 message header + 
106                 user requested payload 
107
108         The msg header is a combination of the fixed RMR header and the variable
109         trace data and d2 fields which may vary for each message.
110 */
111 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
112         if( msg == NULL || msg->header == NULL ) {
113                 errno = EINVAL;
114                 return -1;
115         }
116
117         errno = 0;
118         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
119 }
120
121 /*
122         Allocates a send message as a zerocopy message allowing the underlying message protocol
123         to send the buffer without copy.
124 */
125 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
126         uta_ctx_t*      ctx;
127         rmr_mbuf_t*     m;
128
129         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
130                 return NULL;
131         }
132
133         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
134         return  m;
135 }
136
137
138 /*
139         Allocates a send message as a zerocopy message allowing the underlying message protocol
140         to send the buffer without copy. In addition, a trace data field of tr_size will be
141         added and the supplied data coppied to the buffer before returning the message to
142         the caller.
143 */
144 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
145         uta_ctx_t*      ctx;
146         rmr_mbuf_t*     m;
147         int state;
148
149         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
150                 return NULL;
151         }
152
153         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
154         if( m != NULL ) {
155                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
156                 if( state != tr_size ) {
157                         m->state = RMR_ERR_INITFAILED;
158                 }
159         }
160
161         return  m;
162 }
163
164 /*
165         This provides an external path to the realloc static function as it's called by an
166         outward facing mbuf api function. Used to reallocate a message with a different
167         trace data size.
168 */
169 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
170         return realloc_msg( msg, new_tr_size );
171 }
172
173
174 /*
175         Return the message to the available pool, or free it outright.
176 */
177 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
178         //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
179         //return;
180
181         if( mbuf == NULL ) {
182                 return;
183         }
184
185         if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
186                 if( mbuf->tp_buf ) {
187                         free( mbuf->tp_buf );
188                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
189                 }
190
191                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
192                 free( mbuf );
193         }
194 }
195
196 /*
197         This is a wrapper to the real timeout send. We must wrap it now to ensure that
198         the call flag and call-id are reset
199 */
200 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
201         char* d1;                                                                                                                       // point at the call-id in the header
202
203         if( msg != NULL ) {
204                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
205
206                 d1 = DATA1_ADDR( msg->header );
207                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
208         }       
209
210         return mtosend_msg( vctx, msg, max_to );
211 }
212
213 /*
214         Send with default max timeout as is set in the context.
215         See rmr_mtosend_msg() for more details on the parameters.
216         See rmr_stimeout() for info on setting the default timeout.
217 */
218 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
219         char* d1;                                                                                                               // point at the call-id in the header
220
221         if( msg != NULL ) {
222                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
223
224                 d1 = DATA1_ADDR( msg->header );
225                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
226         }       
227
228         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
229 }
230
231 /*
232         Return to sender allows a message to be sent back to the endpoint where it originated.
233
234         In the SI world the file descriptor that was the source of the message is captured in
235         the mbuffer and thus can be used to quickly find the target for an RTS call. 
236
237         The source information in the message is used to select the socket on which to write
238         the message rather than using the message type and round-robin selection. This
239         should return a message buffer with the state of the send operation set. On success
240         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
241         error it can be passed back to this function to retry the send if desired. On error,
242         errno will liklely have the failure reason set by the nng send processing.
243         The following are possible values for the state in the message buffer:
244
245         Message states returned:
246                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
247                 RMR_ERR_NOHDR  - message did not have a header
248                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
249                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
250                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
251
252         A nil message as the return value is rare, and generally indicates some kind of horrible
253         failure. The value of errno might give a clue as to what is wrong.
254
255         CAUTION:
256                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
257                 The caller must check for this and handle it properly.
258 */
259 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
260         int                     nn_sock;                        // endpoint socket for send
261         uta_ctx_t*      ctx;
262         int                     state;
263         char*           hold_src;                       // we need the original source if send fails
264         char*           hold_ip;                        // also must hold original ip
265         int                     sock_ok = 0;            // true if we found a valid endpoint socket
266         endpoint_t*     ep = NULL;                      // end point to track counts
267
268         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
269                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
270                 if( msg != NULL ) {
271                         msg->state = RMR_ERR_BADARG;
272                         msg->tp_state = errno;
273                 }
274                 return msg;
275         }
276
277         errno = 0;                                                                                                              // at this point any bad state is in msg returned
278         if( msg->header == NULL ) {
279                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
280                 msg->state = RMR_ERR_NOHDR;
281                 msg->tp_state = errno;
282                 return msg;
283         }
284
285         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
286
287 /*
288         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
289         if( ! sock_ok ) {
290 */
291         if( (nn_sock = msg->rts_fd) < 0 ) {
292                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
293                         //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
294                         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
295                 }
296                 if( ! sock_ok ) {
297                         msg->state = RMR_ERR_NOENDPT;
298                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
299                 }
300         }
301
302         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
303         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
304         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
305         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
306         msg = send_msg( ctx, msg, nn_sock, -1 );
307         if( msg ) {
308                 if( ep != NULL ) {
309                         switch( msg->state ) {
310                                 case RMR_OK:
311                                         ep->scounts[EPSC_GOOD]++;
312                                         break;
313                         
314                                 case RMR_ERR_RETRY:
315                                         ep->scounts[EPSC_TRANS]++;
316                                         break;
317
318                                 default:
319                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
320                                         ep->scounts[EPSC_FAIL]++;
321                                         break;
322                         }
323                 }
324                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
325                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
326                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
327         }
328
329         free( hold_src );
330         free( hold_ip );
331         return msg;
332 }
333
334 /*
335         If multi-threading call is turned on, this invokes that mechanism with the special call
336         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
337         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
338         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
339         a flexible timeout.
340
341         On timeout this function will return a nil pointer. If the original message could not
342         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
343
344         Original behavour:
345         Call sends the message based on message routing using the message type, and waits for a
346         response message to arrive with the same transaction id that was in the outgoing message.
347         If, while wiating for the expected response,  messages are received which do not have the
348         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
349         order that they were received.
350
351         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
352         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
353         may be resent (likely the context pointer was nil).  If the message is sent, but no
354         response is received, a nil message is returned with errno set to indicate the likley
355         issue:
356                 ETIMEDOUT -- too many messages were queued before reciving the expected response
357                 ENOBUFS -- the queued message ring is full, messages were dropped
358                 EINVAL  -- A parameter was not valid
359                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
360                                         user should call this function with the message again.
361
362 */
363 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
364         uta_ctx_t*              ctx;
365
366         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
367                 if( msg != NULL ) {
368                         msg->state = RMR_ERR_BADARG;
369                 }
370                 return msg;
371         }
372
373         return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
374 }
375
376 /*
377         The outward facing receive function. When invoked it will pop the oldest message
378         from the receive ring, if any are queued, and return it. If the ring is empty
379         then the receive function is invoked to wait for the next message to arrive (blocking).
380
381         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
382         nil, a new one will be allocated. However, the caller should NOT expect to get the same
383         struct back (if a queued message is returned the message struct will be different).
384 */
385 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
386         uta_ctx_t*      ctx;
387         rmr_mbuf_t*     qm;                             // message that was queued on the ring
388
389         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
390                 errno = EINVAL;
391                 if( old_msg != NULL ) {
392                         old_msg->state = RMR_ERR_BADARG;
393                         old_msg->tp_state = errno;
394                 }
395                 return old_msg;
396         }
397         errno = 0;
398
399         return rmr_mt_rcv( ctx, old_msg, -1 );
400 }
401
402 /*
403         This allows a timeout based receive for applications unable to implement epoll_wait()
404         (e.g. wrappers).
405 */
406 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
407         uta_ctx_t*      ctx;
408
409         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
410                 errno = EINVAL;
411                 if( old_msg != NULL ) {
412                         old_msg->state = RMR_ERR_BADARG;
413                         old_msg->tp_state = errno;
414                 }
415                 return old_msg;
416         }
417
418         return rmr_mt_rcv( ctx, old_msg, ms_to );
419 }
420
421 /*
422         This blocks until the message with the 'expect' ID is received. Messages which are received
423         before the expected message are queued onto the message ring.  The function will return
424         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
425         expected message is received. If the queued message ring fills a nil pointer is returned
426         and errno is set to ENOBUFS.
427
428         Generally this will be invoked only by the call() function as it waits for a response, but
429         it is exposed to the user application as three is no reason not to.
430 */
431 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
432         uta_ctx_t*      ctx;
433         int     queued = 0;                             // number we pushed into the ring
434         int     exp_len = 0;                    // length of expected ID
435
436         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
437                 errno = EINVAL;
438                 if( msg != NULL ) {
439                         msg->state = RMR_ERR_BADARG;
440                         msg->tp_state = errno;
441                 }
442                 return msg;
443         }
444
445         errno = 0;
446
447         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
448                 return rmr_rcv_msg( ctx, msg );
449         }
450
451         exp_len = strlen( expect );
452         if( exp_len > RMR_MAX_XID ) {
453                 exp_len = RMR_MAX_XID;
454         }
455         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
456
457         while( queued < allow2queue ) {
458                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
459                 if( msg->state == RMR_OK ) {
460                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
461                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
462                                 return msg;
463                         }
464
465                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
466                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
467                                 errno = ENOBUFS;
468                                 return NULL;
469                         }
470
471                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
472                         queued++;
473                         msg = NULL;
474                 }
475         }
476
477         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
478         errno = ETIMEDOUT;
479         return NULL;
480 }
481
482 /*
483         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
484         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
485         mechnism indicates eagain or etimeedout.  All other error conditions are reported
486         without this delay. Setting a timeout of 0 causes no retries to be attempted in
487         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
488         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
489         after every 1K send attempts until the "time" value is reached. Retries are abandoned
490         if NNG returns anything other than EAGAIN or EINTER is returned.
491
492         The default, if this function is not used, is 1; meaning that RMr will retry, but will
493         not enter a sleep.  In all cases the caller should check the status in the message returned
494         after a send call.
495
496         Returns -1 if the context was invalid; RMR_OK otherwise.
497 */
498 extern int rmr_set_stimeout( void* vctx, int time ) {
499         uta_ctx_t*      ctx;
500
501         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
502                 return -1;
503         }
504
505         if( time < 0 ) {
506                 time = 0;
507         }
508
509         ctx->send_retries = time;
510         return RMR_OK;
511 }
512
513 /*
514         Set receive timeout -- not supported in nng implementation
515
516         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
517 */
518 extern int rmr_set_rtimeout( void* vctx, int time ) {
519         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
520         return 0;
521 }
522
523
524 /*
525         This is the actual init workhorse. The user visible function meerly ensures that the
526         calling programme does NOT set any internal flags that are supported, and then
527         invokes this.  Internal functions (the route table collector) which need additional
528         open ports without starting additional route table collectors, will invoke this
529         directly with the proper flag.
530
531         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
532                                 that we know about. The _user_ should ensure that the supplied length also
533                                 includes the trace data length maximum as they are in control of that.
534 */
535 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
536         static  int announced = 0;
537         uta_ctx_t*      ctx = NULL;
538         char    bind_info[256];                         // bind info
539         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
540         char*   port;
541         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
542         char*   proto_port;
543         char    wbuf[1024];                                     // work buffer
544         char*   tok;                                            // pointer at token in a buffer
545         char*   tok2;
546         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
547         int             state;
548         int             i;
549         int             old_vlevel;
550
551         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
552         rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
553
554         if( ! announced ) {
555                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
556                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
557                 announced = 1;
558         }
559         rmr_set_vlevel( old_vlevel );           // return logging to the desired state
560
561         errno = 0;
562         if( uproto_port == NULL ) {
563                 proto_port = strdup( DEF_COMM_PORT );
564         } else {
565                 proto_port = strdup( uproto_port );             // so we can modify it
566         }
567
568         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
569                 errno = ENOMEM;
570                 return NULL;
571         }
572         memset( ctx, 0, sizeof( uta_ctx_t ) );
573
574         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
575         ctx->nrivers = 256;                                                             // number of input flows we'll manage
576         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
577         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
578         for( i = 0; i < ctx->nrivers; i++ ) {
579                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
580         }
581
582         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
583         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
584         ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
585         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
586
587         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
588         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
589
590         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
591                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
592                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
593         } else {
594                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
595         }
596         init_mtcall( ctx );                                                             // set up call chutes
597         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
598
599
600         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
601         if( max_msg_size > 0 ) {
602                 ctx->max_plen = max_msg_size;
603         }
604
605         // we're using a listener to get rtg updates, so we do NOT need this.
606         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
607
608         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
609         if( ctx->si_ctx == NULL ) {
610                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
611                 free_ctx( ctx );
612                 return NULL;
613         }
614
615         if( (port = strchr( proto_port, ':' )) != NULL ) {
616                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
617                         port++;
618                 } else {
619                         *(port++) = 0;                  // term proto string and point at port string
620                         proto = proto_port;             // user supplied proto so point at it rather than default
621                 }
622         } else {
623                 port = proto_port;                      // assume something like "1234" was passed
624         }
625
626         if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
627                 if( atoi( tok ) < 1 ) {
628                         static_rtc = 1;
629                 }
630         }
631
632         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
633                 tok = strdup( tok );                                    // something we can destroy
634                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
635                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
636                 } else {
637                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
638                 }
639                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
640                         *tok2 = 0;                                                      // make it so
641                 }
642
643                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
644                 free( tok );
645         } else {
646                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
647                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
648                         return NULL;
649                 }
650                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
651                         *tok = 0;                                                                       // we don't keep domain portion
652                 }
653         }
654
655         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
656         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
657                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
658                 return NULL;
659         }
660
661         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
662                 if( atoi( tok ) > 0 ) {
663                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
664                 }
665         }
666
667         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
668         if( flags & RMRFL_NAME_ONLY ) {
669                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
670         } else {
671                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
672                 if( ctx->my_ip == NULL ) {
673                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
674                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
675                 }
676         }
677         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
678
679         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
680                 if( *tok == '1' ) {
681                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
682                 }
683         }
684
685
686         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
687                 interface = "0.0.0.0";
688         }
689         
690         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
691         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
692                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
693                 free_ctx( ctx );
694                 return NULL;
695         }
696
697         if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
698                 ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
699         } else {
700                 if( static_rtc ) {
701                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
702                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
703                         }
704                 } else {
705                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
706                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
707                         }
708                 }
709         }
710
711         ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
712         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
713                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
714         }
715
716         free( proto_port );
717         return (void *) ctx;
718 }
719
720 /*
721         Initialise the message routing environment. Flags are one of the UTAFL_
722         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
723         (tcp) to be used, then :port is all that is needed.
724
725         At the moment it seems that TCP really is the only viable protocol, but
726         we'll allow flexibility.
727
728         The return value is a void pointer which must be passed to most uta functions. On
729         error, a nil pointer is returned and errno should be set.
730
731         Flags:
732                 No user flags supported (needed) at the moment, but this provides for extension
733                 without drastically changing anything. The user should invoke with RMRFL_NONE to
734                 avoid any misbehavour as there are internal flags which are suported
735 */
736 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
737         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
738 }
739
740 /*
741         This sets the default trace length which will be added to any message buffers
742         allocated.  It can be set at any time, and if rmr_set_trace() is given a
743         trace len that is different than the default allcoated in a message, the message
744         will be resized.
745
746         Returns 0 on failure and 1 on success. If failure, then errno will be set.
747 */
748 extern int rmr_init_trace( void* vctx, int tr_len ) {
749         uta_ctx_t* ctx;
750
751         errno = 0;
752         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
753                 errno = EINVAL;
754                 return 0;
755         }
756
757         ctx->trace_data_len = tr_len;
758         return 1;
759 }
760
761 /*
762         Return true if routing table is initialised etc. and app can send/receive.
763 */
764 extern int rmr_ready( void* vctx ) {
765         uta_ctx_t *ctx;
766
767         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
768                 return FALSE;
769         }
770
771         if( ctx->rtable != NULL ) {
772                 return TRUE;
773         }
774
775         return FALSE;
776 }
777
778 /*
779         This returns the message queue ring's filedescriptor which can be used for
780         calls to epoll.  The user shouild NOT read, write, or close the fd.
781
782         Returns the file descriptor or -1 on error.
783 */
784 extern int rmr_get_rcvfd( void* vctx ) {
785         uta_ctx_t* ctx;
786         int state;
787
788         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
789                 return -1;
790         }
791
792 /*
793         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
794                 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
795                 return -1;
796         }
797 */
798
799         return uta_ring_getpfd( ctx->mring );
800 }
801
802
803 /*
804         Clean up things.
805
806         There isn't an si_flush() per se, but we can pause, generate
807         a context switch, which should allow the last sent buffer to
808         flow. There isn't exactly an nng_term/close either, so there
809         isn't much we can do.
810 */
811 extern void rmr_close( void* vctx ) {
812         uta_ctx_t *ctx;
813
814         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
815                 return;
816         }
817
818         ctx->shutdown = 1;
819
820         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
821
822         // FIX ME -- how to we turn off si; close all sessions etc?
823         //SIclose( ctx->nn_sock );
824
825 }
826
827
828 // ----- multi-threaded call/receive support -------------------------------------------------
829
830 /*
831         Blocks on the receive ring chute semaphore and then reads from the ring
832         when it is tickled.  If max_wait is -1 then the function blocks until
833         a message is ready on the ring. Else max_wait is assumed to be the number
834         of millaseconds to wait before returning a timeout message.
835 */
836 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
837         uta_ctx_t*      ctx;
838         uta_mhdr_t*     hdr;                    // header in the transport buffer
839         chute_t*        chute;
840         struct timespec ts;                     // time info if we have a timeout
841         long    new_ms;                         // adjusted mu-sec
842         long    seconds = 0;            // max wait seconds
843         long    nano_sec;                       // max wait xlated to nano seconds
844         int             state;
845         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
846         
847         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
848                 errno = EINVAL;
849                 if( mbuf ) {
850                         mbuf->state = RMR_ERR_BADARG;
851                         mbuf->tp_state = errno;
852                 }
853                 return mbuf;
854         }
855
856         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
857
858         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
859
860         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
861                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
862                         if( ombuf ) {
863                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
864                         }       
865                 } else {
866                         mbuf = ombuf;                                           // return original if it was given with timeout status
867                         if( ombuf != NULL ) {
868                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
869                                 mbuf->len = 0;
870                         }
871                 }
872
873                 return mbuf;
874         }
875
876         if( ombuf ) {
877                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
878                 ombuf->len = 0;
879         }
880         if( max_wait > 0 ) {
881                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
882
883                 if( max_wait > 999 ) {
884                         seconds = max_wait / 1000;
885                         max_wait -= seconds * 1000;
886                         ts.tv_sec += seconds;
887                 }
888                 if( max_wait > 0 ) {
889                         nano_sec = max_wait * 1000000;
890                         ts.tv_nsec += nano_sec;
891                         if( ts.tv_nsec > 999999999 ) {
892                                 ts.tv_nsec -= 999999999;
893                                 ts.tv_sec++;
894                         }
895                 }
896
897                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
898         }
899
900         errno = EINTR;
901         state = -1;
902         while( state < 0 && errno == EINTR ) {
903                 if( seconds ) {
904                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
905                 } else {
906                         state = sem_wait( &chute->barrier );
907                 }
908         }
909
910         if( state < 0 ) {
911                 mbuf = ombuf;                           // return caller's buffer if they passed one in
912         } else {
913                 errno = 0;                                              // interrupted call state could be left; clear
914                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
915                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
916                         mbuf->state = RMR_OK;
917
918                         if( ombuf ) {
919                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
920                         }
921                 } else {
922                         errno = ETIMEDOUT;
923                         mbuf = ombuf;                           // no buffer, return user's if there
924                 }
925         }
926
927         if( mbuf ) {
928                 mbuf->tp_state = errno;
929         }
930         return mbuf;
931 }
932
933 /*
934         Accept a message buffer and caller ID, send the message and then wait
935         for the receiver to tickle the semaphore letting us know that a message
936         has been received. The call_id is a value between 2 and 255, inclusive; if
937         it's not in this range an error will be returned. Max wait is the amount
938         of time in millaseconds that the call should block for. If 0 is given
939         then no timeout is set.
940
941         If the mt_call feature has not been initialised, then the attempt to use this
942         funciton will fail with RMR_ERR_NOTSUPP
943
944         If no matching message is received before the max_wait period expires, a
945         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
946         occurs after the message has been sent, then a nil pointer is returned
947         with errno set to some other value.
948 */
949 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
950         rmr_mbuf_t* ombuf;                      // original mbuf passed in
951         uta_ctx_t*      ctx;
952         uta_mhdr_t*     hdr;                    // header in the transport buffer
953         chute_t*        chute;
954         unsigned char*  d1;                     // d1 data in header
955         struct timespec ts;                     // time info if we have a timeout
956         long    new_ms;                         // adjusted mu-sec
957         long    seconds = 0;            // max wait seconds
958         long    nano_sec;                       // max wait xlated to nano seconds
959         int             state;
960         
961         errno = EINVAL;
962         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
963                 if( mbuf ) {
964                         mbuf->tp_state = errno;
965                         mbuf->state = RMR_ERR_BADARG;
966                 }
967                 return mbuf;
968         }
969
970         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
971                 mbuf->state = RMR_ERR_NOTSUPP;
972                 mbuf->tp_state = errno;
973                 return mbuf;
974         }
975
976         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
977                 mbuf->state = RMR_ERR_BADARG;
978                 mbuf->tp_state = errno;
979                 return mbuf;
980         }
981
982         ombuf = mbuf;                                                                                                   // save to return timeout status with
983
984         chute = &ctx->chutes[call_id];
985         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
986                 rmr_free_msg( chute->mbuf );
987                 chute->mbuf = NULL;
988         }
989         
990         hdr = (uta_mhdr_t *) mbuf->header;
991         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
992         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
993         d1 = DATA1_ADDR( hdr );
994         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
995         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
996
997         if( max_wait >= 0 ) {
998                 clock_gettime( CLOCK_REALTIME, &ts );   
999
1000                 if( max_wait > 999 ) {
1001                         seconds = max_wait / 1000;
1002                         max_wait -= seconds * 1000;
1003                         ts.tv_sec += seconds;
1004                 }
1005                 if( max_wait > 0 ) {
1006                         nano_sec = max_wait * 1000000;
1007                         ts.tv_nsec += nano_sec;
1008                         if( ts.tv_nsec > 999999999 ) {
1009                                 ts.tv_nsec -= 999999999;
1010                                 ts.tv_sec++;
1011                         }
1012                 }
1013
1014                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1015         }
1016
1017         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1018         if( mbuf ) {
1019                 if( mbuf->state != RMR_OK ) {
1020                         mbuf->tp_state = errno;
1021                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1022                 }
1023         }
1024
1025         state = 0;
1026         errno = 0;
1027         while( chute->mbuf == NULL && ! errno ) {
1028                 if( seconds ) {
1029                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1030                 } else {
1031                         state = sem_wait( &chute->barrier );
1032                 }
1033
1034                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1035                         errno = 0;
1036                 }
1037
1038                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1039                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1040                                 rmr_free_msg( chute->mbuf );
1041                                 chute->mbuf = NULL;
1042                                 errno = 0;
1043                         }
1044                 }
1045         }
1046
1047         if( state < 0 ) {
1048                 return NULL;                                    // leave errno as set by sem wait call
1049         }
1050
1051         mbuf = chute->mbuf;
1052         mbuf->state = RMR_OK;
1053         chute->mbuf = NULL;
1054
1055         return mbuf;
1056 }
1057
1058 /*
1059         Given an existing message buffer, reallocate the payload portion to
1060         be at least new_len bytes.  The message header will remain such that
1061         the caller may use the rmr_rts_msg() function to return a payload
1062         to the sender. 
1063
1064         The mbuf passed in may or may not be reallocated and the caller must
1065         use the returned pointer and should NOT assume that it can use the 
1066         pointer passed in with the exceptions based on the clone flag.
1067
1068         If the clone flag is set, then a duplicated message, with larger payload
1069         size, is allocated and returned.  The old_msg pointer in this situation is
1070         still valid and must be explicitly freed by the application. If the clone 
1071         message is not set (0), then any memory management of the old message is
1072         handled by the function.
1073
1074         If the copy flag is set, the contents of the old message's payload is 
1075         copied to the reallocated payload.  If the flag is not set, then the 
1076         contents of the payload is undetermined.
1077 */
1078 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1079         if( old_msg == NULL ) {
1080                 return NULL;
1081         }
1082
1083         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1084 }
1085
1086 /*
1087         Enable low latency things in the transport (when supported).
1088 */
1089 extern void rmr_set_low_latency( void* vctx ) {
1090         uta_ctx_t*      ctx;
1091
1092         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1093                 if( ctx->si_ctx != NULL ) {
1094                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1095                 }
1096         }
1097 }
1098
1099 /*
1100         Turn on fast acks.
1101 */
1102 extern void rmr_set_fack( void* vctx ) {
1103         uta_ctx_t*      ctx;
1104
1105         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1106                 if( ctx->si_ctx != NULL ) {
1107                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1108                 }
1109         }
1110 }
1111