Add ability for RMR to request route table
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "rtc_static.c"                         // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c"                       // send/receive static functions
73 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
76
77
78 //------------------------------------------------------------------------------
79
80
81 /*
82         Clean up a context.
83 */
84 static void free_ctx( uta_ctx_t* ctx ) {
85         if( ctx ) {
86                 if( ctx->rtg_addr ) {
87                         free( ctx->rtg_addr );
88                 }
89         }
90 }
91
92 // --------------- public functions --------------------------------------------------------------------------
93
94 /*
95         Returns the size of the payload (bytes) that the msg buffer references.
96         Len in a message is the number of bytes which were received, or should
97         be transmitted, however, it is possible that the mbuf was allocated
98         with a larger payload space than the payload length indicates; this
99         function returns the absolute maximum space that the user has available
100         in the payload. On error (bad msg buffer) -1 is returned and errno should
101         indicate the rason.
102
103         The allocated len stored in the msg is:
104                 transport header length +
105                 message header + 
106                 user requested payload 
107
108         The msg header is a combination of the fixed RMR header and the variable
109         trace data and d2 fields which may vary for each message.
110 */
111 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
112         if( msg == NULL || msg->header == NULL ) {
113                 errno = EINVAL;
114                 return -1;
115         }
116
117         errno = 0;
118         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
119 }
120
121 /*
122         Allocates a send message as a zerocopy message allowing the underlying message protocol
123         to send the buffer without copy.
124 */
125 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
126         uta_ctx_t*      ctx;
127         rmr_mbuf_t*     m;
128
129         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
130                 return NULL;
131         }
132
133         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
134         return  m;
135 }
136
137
138 /*
139         Allocates a send message as a zerocopy message allowing the underlying message protocol
140         to send the buffer without copy. In addition, a trace data field of tr_size will be
141         added and the supplied data coppied to the buffer before returning the message to
142         the caller.
143 */
144 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
145         uta_ctx_t*      ctx;
146         rmr_mbuf_t*     m;
147         int state;
148
149         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
150                 return NULL;
151         }
152
153         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
154         if( m != NULL ) {
155                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
156                 if( state != tr_size ) {
157                         m->state = RMR_ERR_INITFAILED;
158                 }
159         }
160
161         return  m;
162 }
163
164 /*
165         This provides an external path to the realloc static function as it's called by an
166         outward facing mbuf api function. Used to reallocate a message with a different
167         trace data size.
168 */
169 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
170         return realloc_msg( msg, new_tr_size );
171 }
172
173
174 /*
175         Return the message to the available pool, or free it outright.
176 */
177 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
178         //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
179         //return;
180
181         if( mbuf == NULL ) {
182                 return;
183         }
184
185         if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
186                 if( mbuf->tp_buf ) {
187                         free( mbuf->tp_buf );
188                 }
189                 free( mbuf );
190         }
191 }
192
193 /*
194         This is a wrapper to the real timeout send. We must wrap it now to ensure that
195         the call flag and call-id are reset
196 */
197 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
198         char* d1;                                                                                                                       // point at the call-id in the header
199
200         if( msg != NULL ) {
201                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
202
203                 d1 = DATA1_ADDR( msg->header );
204                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
205         }       
206
207         return mtosend_msg( vctx, msg, max_to );
208 }
209
210 /*
211         Send with default max timeout as is set in the context.
212         See rmr_mtosend_msg() for more details on the parameters.
213         See rmr_stimeout() for info on setting the default timeout.
214 */
215 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
216         char* d1;                                                                                                               // point at the call-id in the header
217
218         if( msg != NULL ) {
219                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
220
221                 d1 = DATA1_ADDR( msg->header );
222                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
223         }       
224
225         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
226 }
227
228 /*
229         Return to sender allows a message to be sent back to the endpoint where it originated.
230
231         In the SI world the file descriptor that was the source of the message is captured in
232         the mbuffer and thus can be used to quickly find the target for an RTS call. 
233
234         The source information in the message is used to select the socket on which to write
235         the message rather than using the message type and round-robin selection. This
236         should return a message buffer with the state of the send operation set. On success
237         (state is RMR_OK, the caller may use the buffer for another receive operation), and on
238         error it can be passed back to this function to retry the send if desired. On error,
239         errno will liklely have the failure reason set by the nng send processing.
240         The following are possible values for the state in the message buffer:
241
242         Message states returned:
243                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
244                 RMR_ERR_NOHDR  - message did not have a header
245                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
246                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
247                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
248
249         A nil message as the return value is rare, and generally indicates some kind of horrible
250         failure. The value of errno might give a clue as to what is wrong.
251
252         CAUTION:
253                 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
254                 The caller must check for this and handle it properly.
255 */
256 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
257         int                     nn_sock;                        // endpoint socket for send
258         uta_ctx_t*      ctx;
259         int                     state;
260         char*           hold_src;                       // we need the original source if send fails
261         char*           hold_ip;                        // also must hold original ip
262         int                     sock_ok = 0;            // true if we found a valid endpoint socket
263         endpoint_t*     ep = NULL;                      // end point to track counts
264
265         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
266                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
267                 if( msg != NULL ) {
268                         msg->state = RMR_ERR_BADARG;
269                         msg->tp_state = errno;
270                 }
271                 return msg;
272         }
273
274         errno = 0;                                                                                                              // at this point any bad state is in msg returned
275         if( msg->header == NULL ) {
276                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
277                 msg->state = RMR_ERR_NOHDR;
278                 msg->tp_state = errno;
279                 return msg;
280         }
281
282         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
283
284 /*
285         sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
286         if( ! sock_ok ) {
287 */
288         if( (nn_sock = msg->rts_fd) < 0 ) {
289                 if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
290                         //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
291                         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
292                 }
293                 if( ! sock_ok ) {
294                         msg->state = RMR_ERR_NOENDPT;
295                         return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
296                 }
297         }
298
299         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
300         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
301         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
302         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
303         msg = send_msg( ctx, msg, nn_sock, -1 );
304         if( msg ) {
305                 if( ep != NULL ) {
306                         switch( msg->state ) {
307                                 case RMR_OK:
308                                         ep->scounts[EPSC_GOOD]++;
309                                         break;
310                         
311                                 case RMR_ERR_RETRY:
312                                         ep->scounts[EPSC_TRANS]++;
313                                         break;
314
315                                 default:
316                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
317                                         ep->scounts[EPSC_FAIL]++;
318                                         break;
319                         }
320                 }
321                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
322                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
323                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
324         }
325
326         free( hold_src );
327         free( hold_ip );
328         return msg;
329 }
330
331 /*
332         If multi-threading call is turned on, this invokes that mechanism with the special call
333         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
334         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
335         the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
336         a flexible timeout.
337
338         On timeout this function will return a nil pointer. If the original message could not
339         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
340
341         Original behavour:
342         Call sends the message based on message routing using the message type, and waits for a
343         response message to arrive with the same transaction id that was in the outgoing message.
344         If, while wiating for the expected response,  messages are received which do not have the
345         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
346         order that they were received.
347
348         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
349         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
350         may be resent (likely the context pointer was nil).  If the message is sent, but no
351         response is received, a nil message is returned with errno set to indicate the likley
352         issue:
353                 ETIMEDOUT -- too many messages were queued before reciving the expected response
354                 ENOBUFS -- the queued message ring is full, messages were dropped
355                 EINVAL  -- A parameter was not valid
356                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
357                                         user should call this function with the message again.
358
359 */
360 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
361         uta_ctx_t*              ctx;
362
363         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
364                 if( msg != NULL ) {
365                         msg->state = RMR_ERR_BADARG;
366                 }
367                 return msg;
368         }
369
370         return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
371 }
372
373 /*
374         The outward facing receive function. When invoked it will pop the oldest message
375         from the receive ring, if any are queued, and return it. If the ring is empty
376         then the receive function is invoked to wait for the next message to arrive (blocking).
377
378         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
379         nil, a new one will be allocated. However, the caller should NOT expect to get the same
380         struct back (if a queued message is returned the message struct will be different).
381 */
382 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
383         uta_ctx_t*      ctx;
384         rmr_mbuf_t*     qm;                             // message that was queued on the ring
385
386         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
387                 errno = EINVAL;
388                 if( old_msg != NULL ) {
389                         old_msg->state = RMR_ERR_BADARG;
390                         old_msg->tp_state = errno;
391                 }
392                 return old_msg;
393         }
394         errno = 0;
395
396         return rmr_mt_rcv( ctx, old_msg, -1 );
397 }
398
399 /*
400         This allows a timeout based receive for applications unable to implement epoll_wait()
401         (e.g. wrappers).
402 */
403 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
404         uta_ctx_t*      ctx;
405
406         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
407                 errno = EINVAL;
408                 if( old_msg != NULL ) {
409                         old_msg->state = RMR_ERR_BADARG;
410                         old_msg->tp_state = errno;
411                 }
412                 return old_msg;
413         }
414
415         return rmr_mt_rcv( ctx, old_msg, ms_to );
416 }
417
418 /*
419         This blocks until the message with the 'expect' ID is received. Messages which are received
420         before the expected message are queued onto the message ring.  The function will return
421         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
422         expected message is received. If the queued message ring fills a nil pointer is returned
423         and errno is set to ENOBUFS.
424
425         Generally this will be invoked only by the call() function as it waits for a response, but
426         it is exposed to the user application as three is no reason not to.
427 */
428 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
429         uta_ctx_t*      ctx;
430         int     queued = 0;                             // number we pushed into the ring
431         int     exp_len = 0;                    // length of expected ID
432
433         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
434                 errno = EINVAL;
435                 if( msg != NULL ) {
436                         msg->state = RMR_ERR_BADARG;
437                         msg->tp_state = errno;
438                 }
439                 return msg;
440         }
441
442         errno = 0;
443
444         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
445                 return rmr_rcv_msg( ctx, msg );
446         }
447
448         exp_len = strlen( expect );
449         if( exp_len > RMR_MAX_XID ) {
450                 exp_len = RMR_MAX_XID;
451         }
452         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
453
454         while( queued < allow2queue ) {
455                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
456                 if( msg->state == RMR_OK ) {
457                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
458                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
459                                 return msg;
460                         }
461
462                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
463                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
464                                 errno = ENOBUFS;
465                                 return NULL;
466                         }
467
468                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
469                         queued++;
470                         msg = NULL;
471                 }
472         }
473
474         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
475         errno = ETIMEDOUT;
476         return NULL;
477 }
478
479 /*
480         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
481         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
482         mechnism indicates eagain or etimeedout.  All other error conditions are reported
483         without this delay. Setting a timeout of 0 causes no retries to be attempted in
484         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
485         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
486         after every 1K send attempts until the "time" value is reached. Retries are abandoned
487         if NNG returns anything other than EAGAIN or EINTER is returned.
488
489         The default, if this function is not used, is 1; meaning that RMr will retry, but will
490         not enter a sleep.  In all cases the caller should check the status in the message returned
491         after a send call.
492
493         Returns -1 if the context was invalid; RMR_OK otherwise.
494 */
495 extern int rmr_set_stimeout( void* vctx, int time ) {
496         uta_ctx_t*      ctx;
497
498         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
499                 return -1;
500         }
501
502         if( time < 0 ) {
503                 time = 0;
504         }
505
506         ctx->send_retries = time;
507         return RMR_OK;
508 }
509
510 /*
511         Set receive timeout -- not supported in nng implementation
512
513         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
514 */
515 extern int rmr_set_rtimeout( void* vctx, int time ) {
516         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
517         return 0;
518 }
519
520
521 /*
522         This is the actual init workhorse. The user visible function meerly ensures that the
523         calling programme does NOT set any internal flags that are supported, and then
524         invokes this.  Internal functions (the route table collector) which need additional
525         open ports without starting additional route table collectors, will invoke this
526         directly with the proper flag.
527
528         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
529                                 that we know about. The _user_ should ensure that the supplied length also
530                                 includes the trace data length maximum as they are in control of that.
531 */
532 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
533         static  int announced = 0;
534         uta_ctx_t*      ctx = NULL;
535         char    bind_info[256];                         // bind info
536         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
537         char*   port;
538         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
539         char*   proto_port;
540         char    wbuf[1024];                                     // work buffer
541         char*   tok;                                            // pointer at token in a buffer
542         char*   tok2;
543         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
544         int             state;
545         int             i;
546         int             old_vlevel;
547
548         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
549         rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
550
551         if( ! announced ) {
552                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/e mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
553                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
554                 announced = 1;
555         }
556         rmr_set_vlevel( old_vlevel );           // return logging to the desired state
557
558         errno = 0;
559         if( uproto_port == NULL ) {
560                 proto_port = strdup( DEF_COMM_PORT );
561         } else {
562                 proto_port = strdup( uproto_port );             // so we can modify it
563         }
564
565         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
566                 errno = ENOMEM;
567                 return NULL;
568         }
569         memset( ctx, 0, sizeof( uta_ctx_t ) );
570
571         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
572         ctx->nrivers = 256;                                                             // number of input flows we'll manage
573         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
574         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
575         for( i = 0; i < ctx->nrivers; i++ ) {
576                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
577         }
578
579         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
580         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
581         ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
582         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64;          // add in our header size and a bit of fudge
583
584         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
585         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
586
587         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
588                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
589                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
590         } else {
591                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
592         }
593         init_mtcall( ctx );                                                             // set up call chutes
594         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
595
596
597         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
598         if( max_msg_size > 0 ) {
599                 ctx->max_plen = max_msg_size;
600         }
601
602         // we're using a listener to get rtg updates, so we do NOT need this.
603         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
604
605         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
606         if( ctx->si_ctx == NULL ) {
607                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
608                 free_ctx( ctx );
609                 return NULL;
610         }
611
612         if( (port = strchr( proto_port, ':' )) != NULL ) {
613                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
614                         port++;
615                 } else {
616                         *(port++) = 0;                  // term proto string and point at port string
617                         proto = proto_port;             // user supplied proto so point at it rather than default
618                 }
619         } else {
620                 port = proto_port;                      // assume something like "1234" was passed
621         }
622
623         if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
624                 if( atoi( tok ) < 1 ) {
625                         static_rtc = 1;
626                 }
627         }
628
629         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
630                 tok = strdup( tok );                                    // something we can destroy
631                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
632                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
633                 } else {
634                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
635                 }
636                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
637                         *tok2 = 0;                                                      // make it so
638                 }
639
640                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
641                 free( tok );
642         } else {
643                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
644                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
645                         return NULL;
646                 }
647                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
648                         *tok = 0;                                                                       // we don't keep domain portion
649                 }
650         }
651
652         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
653         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
654                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
655                 return NULL;
656         }
657
658         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
659                 if( atoi( tok ) > 0 ) {
660                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
661                 }
662         }
663
664         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
665         if( flags & RMRFL_NAME_ONLY ) {
666                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
667         } else {
668                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
669                 if( ctx->my_ip == NULL ) {
670                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
671                         strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
672                 }
673         }
674         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
675
676         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
677                 if( *tok == '1' ) {
678                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
679                 }
680         }
681
682
683         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
684                 interface = "0.0.0.0";
685         }
686         
687         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
688         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
689                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
690                 free_ctx( ctx );
691                 return NULL;
692         }
693
694         if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
695                 ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
696         } else {
697                 if( static_rtc ) {
698                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
699                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
700                         }
701                 } else {
702                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
703                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
704                         }
705                 }
706         }
707
708         ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
709         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
710                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
711         }
712
713         free( proto_port );
714         return (void *) ctx;
715 }
716
717 /*
718         Initialise the message routing environment. Flags are one of the UTAFL_
719         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
720         (tcp) to be used, then :port is all that is needed.
721
722         At the moment it seems that TCP really is the only viable protocol, but
723         we'll allow flexibility.
724
725         The return value is a void pointer which must be passed to most uta functions. On
726         error, a nil pointer is returned and errno should be set.
727
728         Flags:
729                 No user flags supported (needed) at the moment, but this provides for extension
730                 without drastically changing anything. The user should invoke with RMRFL_NONE to
731                 avoid any misbehavour as there are internal flags which are suported
732 */
733 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
734         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
735 }
736
737 /*
738         This sets the default trace length which will be added to any message buffers
739         allocated.  It can be set at any time, and if rmr_set_trace() is given a
740         trace len that is different than the default allcoated in a message, the message
741         will be resized.
742
743         Returns 0 on failure and 1 on success. If failure, then errno will be set.
744 */
745 extern int rmr_init_trace( void* vctx, int tr_len ) {
746         uta_ctx_t* ctx;
747
748         errno = 0;
749         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
750                 errno = EINVAL;
751                 return 0;
752         }
753
754         ctx->trace_data_len = tr_len;
755         return 1;
756 }
757
758 /*
759         Return true if routing table is initialised etc. and app can send/receive.
760 */
761 extern int rmr_ready( void* vctx ) {
762         uta_ctx_t *ctx;
763
764         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
765                 return FALSE;
766         }
767
768         if( ctx->rtable != NULL ) {
769                 return TRUE;
770         }
771
772         return FALSE;
773 }
774
775 /*
776         This returns the message queue ring's filedescriptor which can be used for
777         calls to epoll.  The user shouild NOT read, write, or close the fd.
778
779         Returns the file descriptor or -1 on error.
780 */
781 extern int rmr_get_rcvfd( void* vctx ) {
782         uta_ctx_t* ctx;
783         int state;
784
785         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
786                 return -1;
787         }
788
789 /*
790         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
791                 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
792                 return -1;
793         }
794 */
795
796         return uta_ring_getpfd( ctx->mring );
797 }
798
799
800 /*
801         Clean up things.
802
803         There isn't an si_flush() per se, but we can pause, generate
804         a context switch, which should allow the last sent buffer to
805         flow. There isn't exactly an nng_term/close either, so there
806         isn't much we can do.
807 */
808 extern void rmr_close( void* vctx ) {
809         uta_ctx_t *ctx;
810
811         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
812                 return;
813         }
814
815         ctx->shutdown = 1;
816
817         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
818
819         // FIX ME -- how to we turn off si; close all sessions etc?
820         //SIclose( ctx->nn_sock );
821
822 }
823
824
825 // ----- multi-threaded call/receive support -------------------------------------------------
826
827 /*
828         Blocks on the receive ring chute semaphore and then reads from the ring
829         when it is tickled.  If max_wait is -1 then the function blocks until
830         a message is ready on the ring. Else max_wait is assumed to be the number
831         of millaseconds to wait before returning a timeout message.
832 */
833 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
834         uta_ctx_t*      ctx;
835         uta_mhdr_t*     hdr;                    // header in the transport buffer
836         chute_t*        chute;
837         struct timespec ts;                     // time info if we have a timeout
838         long    new_ms;                         // adjusted mu-sec
839         long    seconds = 0;            // max wait seconds
840         long    nano_sec;                       // max wait xlated to nano seconds
841         int             state;
842         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
843         
844         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
845                 errno = EINVAL;
846                 if( mbuf ) {
847                         mbuf->state = RMR_ERR_BADARG;
848                         mbuf->tp_state = errno;
849                 }
850                 return mbuf;
851         }
852
853         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
854
855         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
856
857         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
858                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
859                         if( ombuf ) {
860                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
861                         }       
862                 } else {
863                         mbuf = ombuf;                                           // return original if it was given with timeout status
864                         if( ombuf != NULL ) {
865                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
866                                 mbuf->len = 0;
867                         }
868                 }
869
870                 return mbuf;
871         }
872
873         if( ombuf ) {
874                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
875                 ombuf->len = 0;
876         }
877         if( max_wait > 0 ) {
878                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
879
880                 if( max_wait > 999 ) {
881                         seconds = max_wait / 1000;
882                         max_wait -= seconds * 1000;
883                         ts.tv_sec += seconds;
884                 }
885                 if( max_wait > 0 ) {
886                         nano_sec = max_wait * 1000000;
887                         ts.tv_nsec += nano_sec;
888                         if( ts.tv_nsec > 999999999 ) {
889                                 ts.tv_nsec -= 999999999;
890                                 ts.tv_sec++;
891                         }
892                 }
893
894                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
895         }
896
897         errno = EINTR;
898         state = -1;
899         while( state < 0 && errno == EINTR ) {
900                 if( seconds ) {
901                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
902                 } else {
903                         state = sem_wait( &chute->barrier );
904                 }
905         }
906
907         if( state < 0 ) {
908                 mbuf = ombuf;                           // return caller's buffer if they passed one in
909         } else {
910                 errno = 0;                                              // interrupted call state could be left; clear
911                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
912                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
913                         mbuf->state = RMR_OK;
914
915                         if( ombuf ) {
916                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
917                         }
918                 } else {
919                         errno = ETIMEDOUT;
920                         mbuf = ombuf;                           // no buffer, return user's if there
921                 }
922         }
923
924         if( mbuf ) {
925                 mbuf->tp_state = errno;
926         }
927         return mbuf;
928 }
929
930 /*
931         Accept a message buffer and caller ID, send the message and then wait
932         for the receiver to tickle the semaphore letting us know that a message
933         has been received. The call_id is a value between 2 and 255, inclusive; if
934         it's not in this range an error will be returned. Max wait is the amount
935         of time in millaseconds that the call should block for. If 0 is given
936         then no timeout is set.
937
938         If the mt_call feature has not been initialised, then the attempt to use this
939         funciton will fail with RMR_ERR_NOTSUPP
940
941         If no matching message is received before the max_wait period expires, a
942         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
943         occurs after the message has been sent, then a nil pointer is returned
944         with errno set to some other value.
945 */
946 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
947         rmr_mbuf_t* ombuf;                      // original mbuf passed in
948         uta_ctx_t*      ctx;
949         uta_mhdr_t*     hdr;                    // header in the transport buffer
950         chute_t*        chute;
951         unsigned char*  d1;                     // d1 data in header
952         struct timespec ts;                     // time info if we have a timeout
953         long    new_ms;                         // adjusted mu-sec
954         long    seconds = 0;            // max wait seconds
955         long    nano_sec;                       // max wait xlated to nano seconds
956         int             state;
957         
958         errno = EINVAL;
959         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
960                 if( mbuf ) {
961                         mbuf->tp_state = errno;
962                         mbuf->state = RMR_ERR_BADARG;
963                 }
964                 return mbuf;
965         }
966
967         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
968                 mbuf->state = RMR_ERR_NOTSUPP;
969                 mbuf->tp_state = errno;
970                 return mbuf;
971         }
972
973         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
974                 mbuf->state = RMR_ERR_BADARG;
975                 mbuf->tp_state = errno;
976                 return mbuf;
977         }
978
979         ombuf = mbuf;                                                                                                   // save to return timeout status with
980
981         chute = &ctx->chutes[call_id];
982         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
983                 rmr_free_msg( chute->mbuf );
984                 chute->mbuf = NULL;
985         }
986         
987         hdr = (uta_mhdr_t *) mbuf->header;
988         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
989         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
990         d1 = DATA1_ADDR( hdr );
991         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
992         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
993
994         if( max_wait >= 0 ) {
995                 clock_gettime( CLOCK_REALTIME, &ts );   
996
997                 if( max_wait > 999 ) {
998                         seconds = max_wait / 1000;
999                         max_wait -= seconds * 1000;
1000                         ts.tv_sec += seconds;
1001                 }
1002                 if( max_wait > 0 ) {
1003                         nano_sec = max_wait * 1000000;
1004                         ts.tv_nsec += nano_sec;
1005                         if( ts.tv_nsec > 999999999 ) {
1006                                 ts.tv_nsec -= 999999999;
1007                                 ts.tv_sec++;
1008                         }
1009                 }
1010
1011                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1012         }
1013
1014         mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
1015         if( mbuf ) {
1016                 if( mbuf->state != RMR_OK ) {
1017                         mbuf->tp_state = errno;
1018                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1019                 }
1020         }
1021
1022         state = 0;
1023         errno = 0;
1024         while( chute->mbuf == NULL && ! errno ) {
1025                 if( seconds ) {
1026                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1027                 } else {
1028                         state = sem_wait( &chute->barrier );
1029                 }
1030
1031                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1032                         errno = 0;
1033                 }
1034
1035                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1036                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1037                                 rmr_free_msg( chute->mbuf );
1038                                 chute->mbuf = NULL;
1039                                 errno = 0;
1040                         }
1041                 }
1042         }
1043
1044         if( state < 0 ) {
1045                 return NULL;                                    // leave errno as set by sem wait call
1046         }
1047
1048         mbuf = chute->mbuf;
1049         mbuf->state = RMR_OK;
1050         chute->mbuf = NULL;
1051
1052         return mbuf;
1053 }
1054
1055 /*
1056         Given an existing message buffer, reallocate the payload portion to
1057         be at least new_len bytes.  The message header will remain such that
1058         the caller may use the rmr_rts_msg() function to return a payload
1059         to the sender. 
1060
1061         The mbuf passed in may or may not be reallocated and the caller must
1062         use the returned pointer and should NOT assume that it can use the 
1063         pointer passed in with the exceptions based on the clone flag.
1064
1065         If the clone flag is set, then a duplicated message, with larger payload
1066         size, is allocated and returned.  The old_msg pointer in this situation is
1067         still valid and must be explicitly freed by the application. If the clone 
1068         message is not set (0), then any memory management of the old message is
1069         handled by the function.
1070
1071         If the copy flag is set, the contents of the old message's payload is 
1072         copied to the reallocated payload.  If the flag is not set, then the 
1073         contents of the payload is undetermined.
1074 */
1075 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1076         if( old_msg == NULL ) {
1077                 return NULL;
1078         }
1079
1080         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1081 }
1082
1083 /*
1084         Enable low latency things in the transport (when supported).
1085 */
1086 extern void rmr_set_low_latency( void* vctx ) {
1087         uta_ctx_t*      ctx;
1088
1089         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1090                 if( ctx->si_ctx != NULL ) {
1091                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1092                 }
1093         }
1094 }
1095
1096 /*
1097         Turn on fast acks.
1098 */
1099 extern void rmr_set_fack( void* vctx ) {
1100         uta_ctx_t*      ctx;
1101
1102         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1103                 if( ctx->si_ctx != NULL ) {
1104                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1105                 }
1106         }
1107 }
1108