Fix session discconnect bug in interface to SI95
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59
60 #include "rmr.h"                                // things the users see
61 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
62 #include "rmr_si_private.h"             // things that we need too
63 #include "rmr_symtab.h"
64 #include "rmr_logging.h"
65
66 #include "ring_static.c"                        // message ring support
67 #include "rt_generic_static.c"          // route table things not transport specific
68 #include "rtable_si_static.c"           // route table things -- transport specific
69 #include "rtc_si_static.c"                      // specific RMR only route table collector (SI only for now)
70 #include "tools_static.c"
71 #include "sr_si_static.c"                       // send/receive static functions
72 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
73 #include "mt_call_static.c"
74 #include "mt_call_si_static.c"
75
76
77 //------------------------------------------------------------------------------
78
79
80 /*
81         Clean up a context.
82 */
83 static void free_ctx( uta_ctx_t* ctx ) {
84         if( ctx ) {
85                 if( ctx->rtg_addr ) {
86                         free( ctx->rtg_addr );
87                 }
88         }
89 }
90
91 // --------------- public functions --------------------------------------------------------------------------
92
93 /*
94         Returns the size of the payload (bytes) that the msg buffer references.
95         Len in a message is the number of bytes which were received, or should
96         be transmitted, however, it is possible that the mbuf was allocated
97         with a larger payload space than the payload length indicates; this
98         function returns the absolute maximum space that the user has available
99         in the payload. On error (bad msg buffer) -1 is returned and errno should
100         indicate the rason.
101
102         The allocated len stored in the msg is:
103                 transport header length +
104                 message header + 
105                 user requested payload 
106
107         The msg header is a combination of the fixed RMR header and the variable
108         trace data and d2 fields which may vary for each message.
109 */
110 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
111         if( msg == NULL || msg->header == NULL ) {
112                 errno = EINVAL;
113                 return -1;
114         }
115
116         errno = 0;
117         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
118 }
119
120 /*
121         Allocates a send message as a zerocopy message allowing the underlying message protocol
122         to send the buffer without copy.
123 */
124 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
125         uta_ctx_t*      ctx;
126         rmr_mbuf_t*     m;
127
128         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
129                 return NULL;
130         }
131
132         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
133         return  m;
134 }
135
136
137 /*
138         Allocates a send message as a zerocopy message allowing the underlying message protocol
139         to send the buffer without copy. In addition, a trace data field of tr_size will be
140         added and the supplied data coppied to the buffer before returning the message to
141         the caller.
142 */
143 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
144         uta_ctx_t*      ctx;
145         rmr_mbuf_t*     m;
146         int state;
147
148         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
149                 return NULL;
150         }
151
152         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
153         if( m != NULL ) {
154                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
155                 if( state != tr_size ) {
156                         m->state = RMR_ERR_INITFAILED;
157                 }
158         }
159
160         return  m;
161 }
162
163 /*
164         This provides an external path to the realloc static function as it's called by an
165         outward facing mbuf api function. Used to reallocate a message with a different
166         trace data size.
167 */
168 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
169         return realloc_msg( msg, new_tr_size );
170 }
171
172
173 /*
174         Return the message to the available pool, or free it outright.
175 */
176 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
177         //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
178         //return;
179
180         if( mbuf == NULL ) {
181                 return;
182         }
183
184         if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
185                 if( mbuf->tp_buf ) {
186                         free( mbuf->tp_buf );
187                 }
188                 free( mbuf );
189         }
190 }
191
192 /*
193         This is a wrapper to the real timeout send. We must wrap it now to ensure that
194         the call flag and call-id are reset
195 */
196 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
197         char* d1;                                                                                                                       // point at the call-id in the header
198
199         if( msg != NULL ) {
200                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
201
202                 d1 = DATA1_ADDR( msg->header );
203                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
204         }       
205
206         return mtosend_msg( vctx, msg, max_to );
207 }
208
209 /*
210         Send with default max timeout as is set in the context.
211         See rmr_mtosend_msg() for more details on the parameters.
212         See rmr_stimeout() for info on setting the default timeout.
213 */
214 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
215         char* d1;                                                                                                               // point at the call-id in the header
216
217         if( msg != NULL ) {
218                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
219
220                 d1 = DATA1_ADDR( msg->header );
221                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
222         }       
223
224         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
225 }
226
227 /*
228         Return to sender allows a message to be sent back to the endpoint where it originated.
229
230         In the SI world the file descriptor that was the source of the message is captured in
231         the mbuffer and thus can be used to quickly find the target for an RTS call. 
232
233         The source information in the message is used to select the socket on which to write
234         the message rather than using the message type and round-robin selection. This
235         should return a message buffer with the state of the send operation set. On success
236         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
237         error it can be passed back to this function to retry the send if desired. On error,
238         errno will liklely have the failure reason set by the nng send processing.
239         The following are possible values for the state in the message buffer:
240
241         Message states returned:
242                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
243                 RMR_ERR_NOHDR  - message did not have a header
244                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
245                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
246                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
247
248         A nil message as the return value is rare, and generally indicates some kind of horrible
249         failure. The value of errno might give a clue as to what is wrong.
250
251         CAUTION:
252                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
253                 The caller must check for this and handle it properly.
254 */
255 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
256         int                     nn_sock;                        // endpoint socket for send
257         uta_ctx_t*      ctx;
258         int                     state;
259         char*           hold_src;                       // we need the original source if send fails
260         char*           hold_ip;                        // also must hold original ip
261         int                     sock_ok = 0;            // true if we found a valid endpoint socket
262         endpoint_t*     ep = NULL;                      // end point to track counts
263
264         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
265                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
266                 if( msg != NULL ) {
267                         msg->state = RMR_ERR_BADARG;
268                         msg->tp_state = errno;
269                 }
270                 return msg;
271         }
272
273         errno = 0;                                                                                                              // at this point any bad state is in msg returned
274         if( msg->header == NULL ) {
275                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
276                 msg->state = RMR_ERR_NOHDR;
277                 msg->tp_state = errno;
278                 return msg;
279         }
280
281         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
282
283 /*
284         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
285         if( ! sock_ok ) {
286 */
287         if( (nn_sock = msg->rts_fd) < 0 ) {
288                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
289                         //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
290                         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
291                 }
292                 if( ! sock_ok ) {
293                         msg->state = RMR_ERR_NOENDPT;
294                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
295                 }
296         }
297
298         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
299         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
300         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
301         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
302         msg = send_msg( ctx, msg, nn_sock, -1 );
303         if( msg ) {
304                 if( ep != NULL ) {
305                         switch( msg->state ) {
306                                 case RMR_OK:
307                                         ep->scounts[EPSC_GOOD]++;
308                                         break;
309                         
310                                 case RMR_ERR_RETRY:
311                                         ep->scounts[EPSC_TRANS]++;
312                                         break;
313
314                                 default:
315                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
316                                         ep->scounts[EPSC_FAIL]++;
317                                         break;
318                         }
319                 }
320                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
321                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
322                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
323         }
324
325         free( hold_src );
326         free( hold_ip );
327         return msg;
328 }
329
330 /*
331         If multi-threading call is turned on, this invokes that mechanism with the special call
332         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
333         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
334         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
335         a flexible timeout.
336
337         On timeout this function will return a nil pointer. If the original message could not
338         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
339
340         Original behavour:
341         Call sends the message based on message routing using the message type, and waits for a
342         response message to arrive with the same transaction id that was in the outgoing message.
343         If, while wiating for the expected response,  messages are received which do not have the
344         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
345         order that they were received.
346
347         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
348         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
349         may be resent (likely the context pointer was nil).  If the message is sent, but no
350         response is received, a nil message is returned with errno set to indicate the likley
351         issue:
352                 ETIMEDOUT -- too many messages were queued before reciving the expected response
353                 ENOBUFS -- the queued message ring is full, messages were dropped
354                 EINVAL  -- A parameter was not valid
355                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
356                                         user should call this function with the message again.
357
358 */
359 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
360         uta_ctx_t*              ctx;
361
362         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
363                 if( msg != NULL ) {
364                         msg->state = RMR_ERR_BADARG;
365                 }
366                 return msg;
367         }
368
369         return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
370 }
371
372 /*
373         The outward facing receive function. When invoked it will pop the oldest message
374         from the receive ring, if any are queued, and return it. If the ring is empty
375         then the receive function is invoked to wait for the next message to arrive (blocking).
376
377         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
378         nil, a new one will be allocated. However, the caller should NOT expect to get the same
379         struct back (if a queued message is returned the message struct will be different).
380 */
381 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
382         uta_ctx_t*      ctx;
383         rmr_mbuf_t*     qm;                             // message that was queued on the ring
384
385         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
386                 errno = EINVAL;
387                 if( old_msg != NULL ) {
388                         old_msg->state = RMR_ERR_BADARG;
389                         old_msg->tp_state = errno;
390                 }
391                 return old_msg;
392         }
393         errno = 0;
394
395         return rmr_mt_rcv( ctx, old_msg, -1 );
396 }
397
398 /*
399         This allows a timeout based receive for applications unable to implement epoll_wait()
400         (e.g. wrappers).
401 */
402 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
403         uta_ctx_t*      ctx;
404
405         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
406                 errno = EINVAL;
407                 if( old_msg != NULL ) {
408                         old_msg->state = RMR_ERR_BADARG;
409                         old_msg->tp_state = errno;
410                 }
411                 return old_msg;
412         }
413
414         return rmr_mt_rcv( ctx, old_msg, ms_to );
415 }
416
417 /*
418         This blocks until the message with the 'expect' ID is received. Messages which are received
419         before the expected message are queued onto the message ring.  The function will return
420         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
421         expected message is received. If the queued message ring fills a nil pointer is returned
422         and errno is set to ENOBUFS.
423
424         Generally this will be invoked only by the call() function as it waits for a response, but
425         it is exposed to the user application as three is no reason not to.
426 */
427 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
428         uta_ctx_t*      ctx;
429         int     queued = 0;                             // number we pushed into the ring
430         int     exp_len = 0;                    // length of expected ID
431
432         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
433                 errno = EINVAL;
434                 if( msg != NULL ) {
435                         msg->state = RMR_ERR_BADARG;
436                         msg->tp_state = errno;
437                 }
438                 return msg;
439         }
440
441         errno = 0;
442
443         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
444                 return rmr_rcv_msg( ctx, msg );
445         }
446
447         exp_len = strlen( expect );
448         if( exp_len > RMR_MAX_XID ) {
449                 exp_len = RMR_MAX_XID;
450         }
451         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
452
453         while( queued < allow2queue ) {
454                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
455                 if( msg->state == RMR_OK ) {
456                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
457                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
458                                 return msg;
459                         }
460
461                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
462                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
463                                 errno = ENOBUFS;
464                                 return NULL;
465                         }
466
467                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
468                         queued++;
469                         msg = NULL;
470                 }
471         }
472
473         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
474         errno = ETIMEDOUT;
475         return NULL;
476 }
477
478 /*
479         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
480         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
481         mechnism indicates eagain or etimeedout.  All other error conditions are reported
482         without this delay. Setting a timeout of 0 causes no retries to be attempted in
483         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
484         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
485         after every 1K send attempts until the "time" value is reached. Retries are abandoned
486         if NNG returns anything other than EAGAIN or EINTER is returned.
487
488         The default, if this function is not used, is 1; meaning that RMr will retry, but will
489         not enter a sleep.  In all cases the caller should check the status in the message returned
490         after a send call.
491
492         Returns -1 if the context was invalid; RMR_OK otherwise.
493 */
494 extern int rmr_set_stimeout( void* vctx, int time ) {
495         uta_ctx_t*      ctx;
496
497         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
498                 return -1;
499         }
500
501         if( time < 0 ) {
502                 time = 0;
503         }
504
505         ctx->send_retries = time;
506         return RMR_OK;
507 }
508
509 /*
510         Set receive timeout -- not supported in nng implementation
511
512         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
513 */
514 extern int rmr_set_rtimeout( void* vctx, int time ) {
515         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
516         return 0;
517 }
518
519
520 /*
521         This is the actual init workhorse. The user visible function meerly ensures that the
522         calling programme does NOT set any internal flags that are supported, and then
523         invokes this.  Internal functions (the route table collector) which need additional
524         open ports without starting additional route table collectors, will invoke this
525         directly with the proper flag.
526
527         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
528                                 that we know about. The _user_ should ensure that the supplied length also
529                                 includes the trace data length maximum as they are in control of that.
530 */
531 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
532         static  int announced = 0;
533         uta_ctx_t*      ctx = NULL;
534         char    bind_info[256];                         // bind info
535         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
536         char*   port;
537         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
538         char*   proto_port;
539         char    wbuf[1024];                                     // work buffer
540         char*   tok;                                            // pointer at token in a buffer
541         char*   tok2;
542         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
543         int             state;
544         int             i;
545         int             old_vlevel;
546
547         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
548         rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
549
550         if( ! announced ) {
551                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
552                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
553                 announced = 1;
554         }
555         rmr_set_vlevel( old_vlevel );           // return logging to the desired state
556
557         errno = 0;
558         if( uproto_port == NULL ) {
559                 proto_port = strdup( DEF_COMM_PORT );
560         } else {
561                 proto_port = strdup( uproto_port );             // so we can modify it
562         }
563
564         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
565                 errno = ENOMEM;
566                 return NULL;
567         }
568         memset( ctx, 0, sizeof( uta_ctx_t ) );
569
570         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
571         ctx->nrivers = 256;                                                             // number of input flows we'll manage
572         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
573         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
574         for( i = 0; i < ctx->nrivers; i++ ) {
575                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
576         }
577
578         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
579         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
580         ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
581         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64;          // add in our header size and a bit of fudge
582
583         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
584         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
585
586         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
587                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
588                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
589         } else {
590                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
591         }
592         init_mtcall( ctx );                                                             // set up call chutes
593         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
594
595
596         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
597         if( max_msg_size > 0 ) {
598                 ctx->max_plen = max_msg_size;
599         }
600
601         // we're using a listener to get rtg updates, so we do NOT need this.
602         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
603
604         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
605         if( ctx->si_ctx == NULL ) {
606                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
607                 free_ctx( ctx );
608                 return NULL;
609         }
610
611         if( (port = strchr( proto_port, ':' )) != NULL ) {
612                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
613                         port++;
614                 } else {
615                         *(port++) = 0;                  // term proto string and point at port string
616                         proto = proto_port;             // user supplied proto so point at it rather than default
617                 }
618         } else {
619                 port = proto_port;                      // assume something like "1234" was passed
620         }
621
622         if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
623                 if( atoi( tok ) < 1 ) {
624                         static_rtc = 1;
625                 }
626         }
627
628         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
629                 tok = strdup( tok );                                    // something we can destroy
630                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
631                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
632                 } else {
633                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
634                 }
635                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
636                         *tok2 = 0;                                                      // make it so
637                 }
638
639                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
640                 free( tok );
641         } else {
642                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
643                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
644                         return NULL;
645                 }
646                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
647                         *tok = 0;                                                                       // we don't keep domain portion
648                 }
649         }
650
651         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
652         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
653                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
654                 return NULL;
655         }
656
657         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
658                 if( atoi( tok ) > 0 ) {
659                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
660                 }
661         }
662
663         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
664         if( flags & RMRFL_NAME_ONLY ) {
665                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
666         } else {
667                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
668                 if( ctx->my_ip == NULL ) {
669                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
670                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
671                 }
672         }
673         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
674
675         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
676                 if( *tok == '1' ) {
677                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
678                 }
679         }
680
681
682         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
683                 interface = "0.0.0.0";
684         }
685         
686         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
687         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
688                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
689                 free_ctx( ctx );
690                 return NULL;
691         }
692
693         if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
694                 if( static_rtc ) {
695                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
696                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
697                         }
698                 } else {
699                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
700                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
701                         }
702                 }
703         }
704
705         ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
706         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
707                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
708         }
709
710         free( proto_port );
711         return (void *) ctx;
712 }
713
714 /*
715         Initialise the message routing environment. Flags are one of the UTAFL_
716         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
717         (tcp) to be used, then :port is all that is needed.
718
719         At the moment it seems that TCP really is the only viable protocol, but
720         we'll allow flexibility.
721
722         The return value is a void pointer which must be passed to most uta functions. On
723         error, a nil pointer is returned and errno should be set.
724
725         Flags:
726                 No user flags supported (needed) at the moment, but this provides for extension
727                 without drastically changing anything. The user should invoke with RMRFL_NONE to
728                 avoid any misbehavour as there are internal flags which are suported
729 */
730 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
731         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
732 }
733
734 /*
735         This sets the default trace length which will be added to any message buffers
736         allocated.  It can be set at any time, and if rmr_set_trace() is given a
737         trace len that is different than the default allcoated in a message, the message
738         will be resized.
739
740         Returns 0 on failure and 1 on success. If failure, then errno will be set.
741 */
742 extern int rmr_init_trace( void* vctx, int tr_len ) {
743         uta_ctx_t* ctx;
744
745         errno = 0;
746         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
747                 errno = EINVAL;
748                 return 0;
749         }
750
751         ctx->trace_data_len = tr_len;
752         return 1;
753 }
754
755 /*
756         Return true if routing table is initialised etc. and app can send/receive.
757 */
758 extern int rmr_ready( void* vctx ) {
759         uta_ctx_t *ctx;
760
761         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
762                 return FALSE;
763         }
764
765         if( ctx->rtable != NULL ) {
766                 return TRUE;
767         }
768
769         return FALSE;
770 }
771
772 /*
773         This returns the message queue ring's filedescriptor which can be used for
774         calls to epoll.  The user shouild NOT read, write, or close the fd.
775
776         Returns the file descriptor or -1 on error.
777 */
778 extern int rmr_get_rcvfd( void* vctx ) {
779         uta_ctx_t* ctx;
780         int state;
781
782         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
783                 return -1;
784         }
785
786 /*
787         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
788                 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
789                 return -1;
790         }
791 */
792
793         return uta_ring_getpfd( ctx->mring );
794 }
795
796
797 /*
798         Clean up things.
799
800         There isn't an si_flush() per se, but we can pause, generate
801         a context switch, which should allow the last sent buffer to
802         flow. There isn't exactly an nng_term/close either, so there
803         isn't much we can do.
804 */
805 extern void rmr_close( void* vctx ) {
806         uta_ctx_t *ctx;
807
808         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
809                 return;
810         }
811
812         ctx->shutdown = 1;
813
814         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
815
816         // FIX ME -- how to we turn off si; close all sessions etc?
817         //SIclose( ctx->nn_sock );
818
819 }
820
821
822 // ----- multi-threaded call/receive support -------------------------------------------------
823
824 /*
825         Blocks on the receive ring chute semaphore and then reads from the ring
826         when it is tickled.  If max_wait is -1 then the function blocks until
827         a message is ready on the ring. Else max_wait is assumed to be the number
828         of millaseconds to wait before returning a timeout message.
829 */
830 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
831         uta_ctx_t*      ctx;
832         uta_mhdr_t*     hdr;                    // header in the transport buffer
833         chute_t*        chute;
834         struct timespec ts;                     // time info if we have a timeout
835         long    new_ms;                         // adjusted mu-sec
836         long    seconds = 0;            // max wait seconds
837         long    nano_sec;                       // max wait xlated to nano seconds
838         int             state;
839         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
840         
841         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
842                 errno = EINVAL;
843                 if( mbuf ) {
844                         mbuf->state = RMR_ERR_BADARG;
845                         mbuf->tp_state = errno;
846                 }
847                 return mbuf;
848         }
849
850         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
851
852         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
853
854         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
855                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
856                         if( ombuf ) {
857                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
858                         }       
859                 } else {
860                         mbuf = ombuf;                                           // return original if it was given with timeout status
861                         if( ombuf != NULL ) {
862                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
863                                 mbuf->len = 0;
864                         }
865                 }
866
867                 return mbuf;
868         }
869
870         if( ombuf ) {
871                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
872                 ombuf->len = 0;
873         }
874         if( max_wait > 0 ) {
875                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
876
877                 if( max_wait > 999 ) {
878                         seconds = max_wait / 1000;
879                         max_wait -= seconds * 1000;
880                         ts.tv_sec += seconds;
881                 }
882                 if( max_wait > 0 ) {
883                         nano_sec = max_wait * 1000000;
884                         ts.tv_nsec += nano_sec;
885                         if( ts.tv_nsec > 999999999 ) {
886                                 ts.tv_nsec -= 999999999;
887                                 ts.tv_sec++;
888                         }
889                 }
890
891                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
892         }
893
894         errno = EINTR;
895         state = -1;
896         while( state < 0 && errno == EINTR ) {
897                 if( seconds ) {
898                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
899                 } else {
900                         state = sem_wait( &chute->barrier );
901                 }
902         }
903
904         if( state < 0 ) {
905                 mbuf = ombuf;                           // return caller's buffer if they passed one in
906         } else {
907                 errno = 0;                                              // interrupted call state could be left; clear
908                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
909                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
910                         mbuf->state = RMR_OK;
911
912                         if( ombuf ) {
913                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
914                         }
915                 } else {
916                         errno = ETIMEDOUT;
917                         mbuf = ombuf;                           // no buffer, return user's if there
918                 }
919         }
920
921         if( mbuf ) {
922                 mbuf->tp_state = errno;
923         }
924         return mbuf;
925 }
926
927 /*
928         Accept a message buffer and caller ID, send the message and then wait
929         for the receiver to tickle the semaphore letting us know that a message
930         has been received. The call_id is a value between 2 and 255, inclusive; if
931         it's not in this range an error will be returned. Max wait is the amount
932         of time in millaseconds that the call should block for. If 0 is given
933         then no timeout is set.
934
935         If the mt_call feature has not been initialised, then the attempt to use this
936         funciton will fail with RMR_ERR_NOTSUPP
937
938         If no matching message is received before the max_wait period expires, a
939         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
940         occurs after the message has been sent, then a nil pointer is returned
941         with errno set to some other value.
942 */
943 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
944         rmr_mbuf_t* ombuf;                      // original mbuf passed in
945         uta_ctx_t*      ctx;
946         uta_mhdr_t*     hdr;                    // header in the transport buffer
947         chute_t*        chute;
948         unsigned char*  d1;                     // d1 data in header
949         struct timespec ts;                     // time info if we have a timeout
950         long    new_ms;                         // adjusted mu-sec
951         long    seconds = 0;            // max wait seconds
952         long    nano_sec;                       // max wait xlated to nano seconds
953         int             state;
954         
955         errno = EINVAL;
956         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
957                 if( mbuf ) {
958                         mbuf->tp_state = errno;
959                         mbuf->state = RMR_ERR_BADARG;
960                 }
961                 return mbuf;
962         }
963
964         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
965                 mbuf->state = RMR_ERR_NOTSUPP;
966                 mbuf->tp_state = errno;
967                 return mbuf;
968         }
969
970         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
971                 mbuf->state = RMR_ERR_BADARG;
972                 mbuf->tp_state = errno;
973                 return mbuf;
974         }
975
976         ombuf = mbuf;                                                                                                   // save to return timeout status with
977
978         chute = &ctx->chutes[call_id];
979         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
980                 rmr_free_msg( chute->mbuf );
981                 chute->mbuf = NULL;
982         }
983         
984         hdr = (uta_mhdr_t *) mbuf->header;
985         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
986         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
987         d1 = DATA1_ADDR( hdr );
988         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
989         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
990
991         if( max_wait >= 0 ) {
992                 clock_gettime( CLOCK_REALTIME, &ts );   
993
994                 if( max_wait > 999 ) {
995                         seconds = max_wait / 1000;
996                         max_wait -= seconds * 1000;
997                         ts.tv_sec += seconds;
998                 }
999                 if( max_wait > 0 ) {
1000                         nano_sec = max_wait * 1000000;
1001                         ts.tv_nsec += nano_sec;
1002                         if( ts.tv_nsec > 999999999 ) {
1003                                 ts.tv_nsec -= 999999999;
1004                                 ts.tv_sec++;
1005                         }
1006                 }
1007
1008                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1009         }
1010
1011         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1012         if( mbuf ) {
1013                 if( mbuf->state != RMR_OK ) {
1014                         mbuf->tp_state = errno;
1015                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1016                 }
1017         }
1018
1019         state = 0;
1020         errno = 0;
1021         while( chute->mbuf == NULL && ! errno ) {
1022                 if( seconds ) {
1023                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1024                 } else {
1025                         state = sem_wait( &chute->barrier );
1026                 }
1027
1028                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1029                         errno = 0;
1030                 }
1031
1032                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1033                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1034                                 rmr_free_msg( chute->mbuf );
1035                                 chute->mbuf = NULL;
1036                                 errno = 0;
1037                         }
1038                 }
1039         }
1040
1041         if( state < 0 ) {
1042                 return NULL;                                    // leave errno as set by sem wait call
1043         }
1044
1045         mbuf = chute->mbuf;
1046         mbuf->state = RMR_OK;
1047         chute->mbuf = NULL;
1048
1049         return mbuf;
1050 }
1051
1052 /*
1053         Given an existing message buffer, reallocate the payload portion to
1054         be at least new_len bytes.  The message header will remain such that
1055         the caller may use the rmr_rts_msg() function to return a payload
1056         to the sender. 
1057
1058         The mbuf passed in may or may not be reallocated and the caller must
1059         use the returned pointer and should NOT assume that it can use the 
1060         pointer passed in with the exceptions based on the clone flag.
1061
1062         If the clone flag is set, then a duplicated message, with larger payload
1063         size, is allocated and returned.  The old_msg pointer in this situation is
1064         still valid and must be explicitly freed by the application. If the clone 
1065         message is not set (0), then any memory management of the old message is
1066         handled by the function.
1067
1068         If the copy flag is set, the contents of the old message's payload is 
1069         copied to the reallocated payload.  If the flag is not set, then the 
1070         contents of the payload is undetermined.
1071 */
1072 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1073         if( old_msg == NULL ) {
1074                 return NULL;
1075         }
1076
1077         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1078 }
1079
1080 /*
1081         Enable low latency things in the transport (when supported).
1082 */
1083 extern void rmr_set_low_latency( void* vctx ) {
1084         uta_ctx_t*      ctx;
1085
1086         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1087                 if( ctx->si_ctx != NULL ) {
1088                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1089                 }
1090         }
1091 }
1092
1093 /*
1094         Turn on fast acks.
1095 */
1096 extern void rmr_set_fack( void* vctx ) {
1097         uta_ctx_t*      ctx;
1098
1099         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1100                 if( ctx->si_ctx != NULL ) {
1101                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1102                 }
1103         }
1104 }
1105