Fix SI address and initialistion bugs
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
1 // vim: ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rmr_si.c
23         Abstract:       This is the compile point for the si version of the rmr
24                                 library (formarly known as uta, so internal function names
25                                 are likely still uta_*)
26
27                                 With the exception of the symtab portion of the library,
28                                 RMr is built with a single compile so as to "hide" the
29                                 internal functions as statics.  Because they interdepend
30                                 on each other, and CMake has issues with generating two
31                                 different wormhole objects from a single source, we just
32                                 pull it all together with a centralised comple using
33                                 includes.
34
35                                 Future:  the API functions at this point can be separated
36                                 into a common source module.
37
38         Author:         E. Scott Daniels
39         Date:           1 February 2019
40 */
41
42 #include <ctype.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <netdb.h>
46 #include <errno.h>
47 #include <string.h>
48 #include <errno.h>
49 #include <pthread.h>
50 #include <unistd.h>
51 #include <time.h>
52 #include <arpa/inet.h>
53 #include <semaphore.h>
54 #include <pthread.h>
55
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
58
59 #define SI95_BUILD      1                       // we drop some common functions for si
60
61 #include "rmr.h"                                // things the users see
62 #include "rmr_agnostic.h"               // agnostic things (must be included before private)
63 #include "rmr_si_private.h"             // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
66
67 #include "ring_static.c"                        // message ring support
68 #include "rt_generic_static.c"          // route table things not transport specific
69 #include "rtable_si_static.c"           // route table things -- transport specific
70 #include "rtc_static.c"                         // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c"                       // send/receive static functions
73 #include "wormholes.c"                          // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
76
77
78 //------------------------------------------------------------------------------
79
80
81 /*
82         Clean up a context.
83 */
84 static void free_ctx( uta_ctx_t* ctx ) {
85         if( ctx ) {
86                 if( ctx->rtg_addr ) {
87                         free( ctx->rtg_addr );
88                 }
89         }
90 }
91
92 // --------------- public functions --------------------------------------------------------------------------
93
94 /*
95         Returns the size of the payload (bytes) that the msg buffer references.
96         Len in a message is the number of bytes which were received, or should
97         be transmitted, however, it is possible that the mbuf was allocated
98         with a larger payload space than the payload length indicates; this
99         function returns the absolute maximum space that the user has available
100         in the payload. On error (bad msg buffer) -1 is returned and errno should
101         indicate the rason.
102
103         The allocated len stored in the msg is:
104                 transport header length +
105                 message header +
106                 user requested payload
107
108         The msg header is a combination of the fixed RMR header and the variable
109         trace data and d2 fields which may vary for each message.
110 */
111 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
112         if( msg == NULL || msg->header == NULL ) {
113                 errno = EINVAL;
114                 return -1;
115         }
116
117         errno = 0;
118         return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN;        // allocated transport size less the header and other data bits
119 }
120
121 /*
122         Allocates a send message as a zerocopy message allowing the underlying message protocol
123         to send the buffer without copy.
124 */
125 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
126         uta_ctx_t*      ctx;
127         rmr_mbuf_t*     m;
128
129         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
130                 return NULL;
131         }
132
133         m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
134         return  m;
135 }
136
137
138 /*
139         Allocates a send message as a zerocopy message allowing the underlying message protocol
140         to send the buffer without copy. In addition, a trace data field of tr_size will be
141         added and the supplied data coppied to the buffer before returning the message to
142         the caller.
143 */
144 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
145         uta_ctx_t*      ctx;
146         rmr_mbuf_t*     m;
147         int state;
148
149         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
150                 return NULL;
151         }
152
153         m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
154         if( m != NULL ) {
155                 state = rmr_set_trace( m, data, tr_size );                              // roll their data in
156                 if( state != tr_size ) {
157                         m->state = RMR_ERR_INITFAILED;
158                 }
159         }
160
161         return  m;
162 }
163
164 /*
165         This provides an external path to the realloc static function as it's called by an
166         outward facing mbuf api function. Used to reallocate a message with a different
167         trace data size.
168 */
169 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
170         return realloc_msg( msg, new_tr_size );
171 }
172
173
174 /*
175         Return the message to the available pool, or free it outright.
176 */
177 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
178         //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
179         //return;
180
181         if( mbuf == NULL ) {
182                 return;
183         }
184
185         if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
186                 if( mbuf->tp_buf ) {
187                         free( mbuf->tp_buf );
188                         mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
189                 }
190
191                 mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
192                 free( mbuf );
193         }
194 }
195
196 /*
197         This is a wrapper to the real timeout send. We must wrap it now to ensure that
198         the call flag and call-id are reset
199 */
200 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
201         char* d1;                                                                                                                       // point at the call-id in the header
202
203         if( msg != NULL ) {
204                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
205
206                 d1 = DATA1_ADDR( msg->header );
207                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
208         }
209
210         return mtosend_msg( vctx, msg, max_to );
211 }
212
213 /*
214         Send with default max timeout as is set in the context.
215         See rmr_mtosend_msg() for more details on the parameters.
216         See rmr_stimeout() for info on setting the default timeout.
217 */
218 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
219         char* d1;                                                                                                               // point at the call-id in the header
220
221         if( msg != NULL ) {
222                 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
223
224                 d1 = DATA1_ADDR( msg->header );
225                 d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
226         }
227
228         return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
229 }
230
231 /*
232         Return to sender allows a message to be sent back to the endpoint where it originated.
233
234         With SI95 it was thought that the return to sender would be along the same open conneciton
235         and thus no table lookup would be needed to open a 'reverse direction' path. However, for
236         applications sending at high message rates, returning responses on the same connection
237         causes major strife. Thus the decision was made to use the same method as NNG and just
238         open a second connection for reverse path.
239
240         We will attempt to use the name in the received message to look up the endpoint. If
241         that failes, then we will write on the connection that the message arrived on as a
242         falback.
243
244         On success (state is RMR_OK, the caller may use the buffer for another receive operation),
245         and on error it can be passed back to this function to retry the send if desired. On error,
246         errno will liklely have the failure reason set by the nng send processing.  The following
247         are possible values for the state in the message buffer:
248
249         Message states returned:
250                 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
251                 RMR_ERR_NOHDR  - message did not have a header
252                 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
253                 RMR_ERR_SENDFAILED - send failed; errno has nano error code
254                 RMR_ERR_RETRY   - the reqest failed but should be retried (EAGAIN)
255
256         A nil message as the return value is rare, and generally indicates some kind of horrible
257         failure. The value of errno might give a clue as to what is wrong.
258
259         CAUTION:
260                 Like send_msg(), this is non-blocking and will return the msg if there is an error.
261                 The caller must check for this and handle it properly.
262 */
263 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
264         int                     nn_sock;                        // endpoint socket for send
265         uta_ctx_t*      ctx;
266         int                     state;
267         char*           hold_src;                       // we need the original source if send fails
268         char*           hold_ip;                        // also must hold original ip
269         int                     sock_ok = 0;            // true if we found a valid endpoint socket
270         endpoint_t*     ep = NULL;                      // end point to track counts
271
272         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
273                 errno = EINVAL;                                                                                         // if msg is null, this is their clue
274                 if( msg != NULL ) {
275                         msg->state = RMR_ERR_BADARG;
276                         msg->tp_state = errno;
277                 }
278                 return msg;
279         }
280
281         errno = 0;                                                                                                              // at this point any bad state is in msg returned
282         if( msg->header == NULL ) {
283                 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
284                 msg->state = RMR_ERR_NOHDR;
285                 msg->tp_state = errno;
286                 return msg;
287         }
288
289         ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
290
291         sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
292         if( ! sock_ok ) {
293                 if( (nn_sock = msg->rts_fd) < 0 ) {
294                         if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
295                                 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
296                         }
297                         if( ! sock_ok ) {
298                                 msg->state = RMR_ERR_NOENDPT;
299                                 return msg;
300                         }
301                 }
302         }
303
304
305         msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
306         hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
307         hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
308         strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
309         msg = send_msg( ctx, msg, nn_sock, -1 );
310         if( msg ) {
311                 if( ep != NULL ) {
312                         switch( msg->state ) {
313                                 case RMR_OK:
314                                         ep->scounts[EPSC_GOOD]++;
315                                         break;
316
317                                 case RMR_ERR_RETRY:
318                                         ep->scounts[EPSC_TRANS]++;
319                                         break;
320
321                                 default:
322                                         // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
323                                         ep->scounts[EPSC_FAIL]++;
324                                         break;
325                         }
326                 }
327                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
328                 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
329                 msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
330         }
331
332         free( hold_src );
333         free( hold_ip );
334         return msg;
335 }
336
337 /*
338         If multi-threading call is turned on, this invokes that mechanism with the special call
339         id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
340         behavour (described below) is carried out.  This is safe to use when mt is enabled, but
341         the user app is invoking rmr_call() from only one thread, and the caller doesn't need
342         a flexible timeout.
343
344         On timeout this function will return a nil pointer. If the original message could not
345         be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
346
347         Original behavour:
348         Call sends the message based on message routing using the message type, and waits for a
349         response message to arrive with the same transaction id that was in the outgoing message.
350         If, while wiating for the expected response,  messages are received which do not have the
351         desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
352         order that they were received.
353
354         Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
355         to ensure that no error was encountered. If the state is UTA_BADARG, then the message
356         may be resent (likely the context pointer was nil).  If the message is sent, but no
357         response is received, a nil message is returned with errno set to indicate the likley
358         issue:
359                 ETIMEDOUT -- too many messages were queued before reciving the expected response
360                 ENOBUFS -- the queued message ring is full, messages were dropped
361                 EINVAL  -- A parameter was not valid
362                 EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
363                                         user should call this function with the message again.
364
365 */
366 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
367         uta_ctx_t*              ctx;
368
369         if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
370                 if( msg != NULL ) {
371                         msg->state = RMR_ERR_BADARG;
372                 }
373                 return msg;
374         }
375
376         return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
377 }
378
379 /*
380         The outward facing receive function. When invoked it will pop the oldest message
381         from the receive ring, if any are queued, and return it. If the ring is empty
382         then the receive function is invoked to wait for the next message to arrive (blocking).
383
384         If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
385         nil, a new one will be allocated. However, the caller should NOT expect to get the same
386         struct back (if a queued message is returned the message struct will be different).
387 */
388 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
389         uta_ctx_t*      ctx;
390         rmr_mbuf_t*     qm;                             // message that was queued on the ring
391
392         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
393                 errno = EINVAL;
394                 if( old_msg != NULL ) {
395                         old_msg->state = RMR_ERR_BADARG;
396                         old_msg->tp_state = errno;
397                 }
398                 return old_msg;
399         }
400         errno = 0;
401
402         return rmr_mt_rcv( ctx, old_msg, -1 );
403 }
404
405 /*
406         This allows a timeout based receive for applications unable to implement epoll_wait()
407         (e.g. wrappers).
408 */
409 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
410         uta_ctx_t*      ctx;
411
412         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
413                 errno = EINVAL;
414                 if( old_msg != NULL ) {
415                         old_msg->state = RMR_ERR_BADARG;
416                         old_msg->tp_state = errno;
417                 }
418                 return old_msg;
419         }
420
421         return rmr_mt_rcv( ctx, old_msg, ms_to );
422 }
423
424 /*
425         This blocks until the message with the 'expect' ID is received. Messages which are received
426         before the expected message are queued onto the message ring.  The function will return
427         a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
428         expected message is received. If the queued message ring fills a nil pointer is returned
429         and errno is set to ENOBUFS.
430
431         Generally this will be invoked only by the call() function as it waits for a response, but
432         it is exposed to the user application as three is no reason not to.
433 */
434 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
435         uta_ctx_t*      ctx;
436         int     queued = 0;                             // number we pushed into the ring
437         int     exp_len = 0;                    // length of expected ID
438
439         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
440                 errno = EINVAL;
441                 if( msg != NULL ) {
442                         msg->state = RMR_ERR_BADARG;
443                         msg->tp_state = errno;
444                 }
445                 return msg;
446         }
447
448         errno = 0;
449
450         if( expect == NULL || ! *expect ) {                             // nothing expected if nil or empty string, just receive
451                 return rmr_rcv_msg( ctx, msg );
452         }
453
454         exp_len = strlen( expect );
455         if( exp_len > RMR_MAX_XID ) {
456                 exp_len = RMR_MAX_XID;
457         }
458         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
459
460         while( queued < allow2queue ) {
461                 msg = rcv_msg( ctx, msg );                                      // hard wait for next
462                 if( msg->state == RMR_OK ) {
463                         if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
464                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
465                                 return msg;
466                         }
467
468                         if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
469                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
470                                 errno = ENOBUFS;
471                                 return NULL;
472                         }
473
474                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
475                         queued++;
476                         msg = NULL;
477                 }
478         }
479
480         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
481         errno = ETIMEDOUT;
482         return NULL;
483 }
484
485 /*
486         Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
487         _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
488         mechnism indicates eagain or etimeedout.  All other error conditions are reported
489         without this delay. Setting a timeout of 0 causes no retries to be attempted in
490         RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
491         but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
492         after every 1K send attempts until the "time" value is reached. Retries are abandoned
493         if NNG returns anything other than EAGAIN or EINTER is returned.
494
495         The default, if this function is not used, is 1; meaning that RMr will retry, but will
496         not enter a sleep.  In all cases the caller should check the status in the message returned
497         after a send call.
498
499         Returns -1 if the context was invalid; RMR_OK otherwise.
500 */
501 extern int rmr_set_stimeout( void* vctx, int time ) {
502         uta_ctx_t*      ctx;
503
504         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
505                 return -1;
506         }
507
508         if( time < 0 ) {
509                 time = 0;
510         }
511
512         ctx->send_retries = time;
513         return RMR_OK;
514 }
515
516 /*
517         Set receive timeout -- not supported in nng implementation
518
519         CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
520 */
521 extern int rmr_set_rtimeout( void* vctx, int time ) {
522         rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
523         return 0;
524 }
525
526
527 /*
528         This is the actual init workhorse. The user visible function meerly ensures that the
529         calling programme does NOT set any internal flags that are supported, and then
530         invokes this.  Internal functions (the route table collector) which need additional
531         open ports without starting additional route table collectors, will invoke this
532         directly with the proper flag.
533
534         CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
535                                 that we know about. The _user_ should ensure that the supplied length also
536                                 includes the trace data length maximum as they are in control of that.
537 */
538 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
539         static  int announced = 0;
540         uta_ctx_t*      ctx = NULL;
541         char    bind_info[256];                         // bind info
542         char*   proto = "tcp";                          // pointer into the proto/port string user supplied
543         char*   port;
544         char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
545         char*   proto_port;
546         char    wbuf[1024];                                     // work buffer
547         char*   tok;                                            // pointer at token in a buffer
548         char*   tok2;
549         int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
550         int             state;
551         int             i;
552         int             old_vlevel;
553
554         old_vlevel = rmr_vlog_init();                   // initialise and get the current level
555         rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
556
557         if( ! announced ) {
558                 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
559                         RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
560                 announced = 1;
561         }
562         rmr_set_vlevel( old_vlevel );           // return logging to the desired state
563
564         errno = 0;
565         if( uproto_port == NULL ) {
566                 proto_port = strdup( DEF_COMM_PORT );
567         } else {
568                 proto_port = strdup( uproto_port );             // so we can modify it
569         }
570
571         if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
572                 errno = ENOMEM;
573                 return NULL;
574         }
575         memset( ctx, 0, sizeof( uta_ctx_t ) );
576
577         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
578         ctx->nrivers = 256;                                                             // number of input flows we'll manage
579         ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
580         memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
581         for( i = 0; i < ctx->nrivers; i++ ) {
582                 ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
583         }
584
585         ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
586         ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
587         ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
588         ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
589
590         ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
591         ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
592
593         if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
594                 uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
595                 uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
596         } else {
597                 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
598         }
599         init_mtcall( ctx );                                                             // set up call chutes
600         fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
601
602
603         ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
604         if( max_msg_size > 0 ) {
605                 ctx->max_plen = max_msg_size;
606         }
607
608         // we're using a listener to get rtg updates, so we do NOT need this.
609         //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
610
611         ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
612         if( ctx->si_ctx == NULL ) {
613                 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
614                 free_ctx( ctx );
615                 return NULL;
616         }
617
618         if( (port = strchr( proto_port, ':' )) != NULL ) {
619                 if( port == proto_port ) {              // ":1234" supplied; leave proto to default and point port correctly
620                         port++;
621                 } else {
622                         *(port++) = 0;                  // term proto string and point at port string
623                         proto = proto_port;             // user supplied proto so point at it rather than default
624                 }
625         } else {
626                 port = proto_port;                      // assume something like "1234" was passed
627         }
628
629         if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
630                 if( atoi( tok ) < 1 ) {
631                         static_rtc = 1;
632                 }
633         }
634
635         if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
636                 tok = strdup( tok );                                    // something we can destroy
637                 if( *tok == '[' ) {                                             // we allow an ipv6 address here
638                         tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
639                 } else {
640                         tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
641                 }
642                 if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
643                         *tok2 = 0;                                                      // make it so
644                 }
645
646                 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
647                 free( tok );
648         } else {
649                 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
650                         rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
651                         return NULL;
652                 }
653                 if( (tok = strchr( wbuf, '.' )) != NULL ) {
654                         *tok = 0;                                                                       // we don't keep domain portion
655                 }
656         }
657
658         ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
659         if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
660                 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
661                 return NULL;
662         }
663
664         if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
665                 if( atoi( tok ) > 0 ) {
666                         flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
667                 }
668         }
669
670         ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
671         if( flags & RMRFL_NAME_ONLY ) {
672                 ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
673         } else {
674                 ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
675                 if( ctx->my_ip == NULL ) {
676                         rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
677                         ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
678                 }
679         }
680         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
681
682         if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
683                 if( *tok == '1' ) {
684                         ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
685                 }
686         }
687
688
689         if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
690                 interface = "0.0.0.0";
691         }
692
693         snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
694         if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
695                 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
696                 free_ctx( ctx );
697                 return NULL;
698         }
699
700                                                                                                 // finish all flag setting before threads to keep helgrind quiet
701         ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
702
703         if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
704                 ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
705         } else {
706                 if( static_rtc ) {
707                         if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
708                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
709                         }
710                 } else {
711                         if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
712                                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
713                         }
714                 }
715         }
716
717         if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
718                 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
719         }
720
721         free( proto_port );
722         return (void *) ctx;
723 }
724
725 /*
726         Initialise the message routing environment. Flags are one of the UTAFL_
727         constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
728         (tcp) to be used, then :port is all that is needed.
729
730         At the moment it seems that TCP really is the only viable protocol, but
731         we'll allow flexibility.
732
733         The return value is a void pointer which must be passed to most uta functions. On
734         error, a nil pointer is returned and errno should be set.
735
736         Flags:
737                 No user flags supported (needed) at the moment, but this provides for extension
738                 without drastically changing anything. The user should invoke with RMRFL_NONE to
739                 avoid any misbehavour as there are internal flags which are suported
740 */
741 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
742         return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
743 }
744
745 /*
746         This sets the default trace length which will be added to any message buffers
747         allocated.  It can be set at any time, and if rmr_set_trace() is given a
748         trace len that is different than the default allcoated in a message, the message
749         will be resized.
750
751         Returns 0 on failure and 1 on success. If failure, then errno will be set.
752 */
753 extern int rmr_init_trace( void* vctx, int tr_len ) {
754         uta_ctx_t* ctx;
755
756         errno = 0;
757         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
758                 errno = EINVAL;
759                 return 0;
760         }
761
762         ctx->trace_data_len = tr_len;
763         return 1;
764 }
765
766 /*
767         Return true if routing table is initialised etc. and app can send/receive.
768 */
769 extern int rmr_ready( void* vctx ) {
770         uta_ctx_t *ctx;
771
772         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
773                 return FALSE;
774         }
775
776         if( ctx->rtable != NULL ) {
777                 return TRUE;
778         }
779
780         return FALSE;
781 }
782
783 /*
784         This returns the message queue ring's filedescriptor which can be used for
785         calls to epoll.  The user shouild NOT read, write, or close the fd.
786
787         Returns the file descriptor or -1 on error.
788 */
789 extern int rmr_get_rcvfd( void* vctx ) {
790         uta_ctx_t* ctx;
791         int state;
792
793         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
794                 return -1;
795         }
796
797 /*
798         if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
799                 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
800                 return -1;
801         }
802 */
803
804         return uta_ring_getpfd( ctx->mring );
805 }
806
807
808 /*
809         Clean up things.
810
811         There isn't an si_flush() per se, but we can pause, generate
812         a context switch, which should allow the last sent buffer to
813         flow. There isn't exactly an nng_term/close either, so there
814         isn't much we can do.
815 */
816 extern void rmr_close( void* vctx ) {
817         uta_ctx_t *ctx;
818
819         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
820                 return;
821         }
822
823         ctx->shutdown = 1;
824
825         SItp_stats( ctx->si_ctx );                      // dump some interesting stats
826
827         // FIX ME -- how to we turn off si; close all sessions etc?
828         //SIclose( ctx->nn_sock );
829
830 }
831
832
833 // ----- multi-threaded call/receive support -------------------------------------------------
834
835 /*
836         Blocks on the receive ring chute semaphore and then reads from the ring
837         when it is tickled.  If max_wait is -1 then the function blocks until
838         a message is ready on the ring. Else max_wait is assumed to be the number
839         of millaseconds to wait before returning a timeout message.
840 */
841 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
842         uta_ctx_t*      ctx;
843         uta_mhdr_t*     hdr;                    // header in the transport buffer
844         chute_t*        chute;
845         struct timespec ts;                     // time info if we have a timeout
846         long    new_ms;                         // adjusted mu-sec
847         long    seconds = 0;            // max wait seconds
848         long    nano_sec;                       // max wait xlated to nano seconds
849         int             state;
850         rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
851
852         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
853                 errno = EINVAL;
854                 if( mbuf ) {
855                         mbuf->state = RMR_ERR_BADARG;
856                         mbuf->tp_state = errno;
857                 }
858                 return mbuf;
859         }
860
861         ombuf = mbuf;           // if we timeout we must return original msg with status, so save it
862
863         chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
864
865         if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
866                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
867                         if( ombuf ) {
868                                 rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
869                         }
870                 } else {
871                         mbuf = ombuf;                                           // return original if it was given with timeout status
872                         if( ombuf != NULL ) {
873                                 mbuf->state = RMR_ERR_TIMEOUT;                  // preset if for failure
874                                 mbuf->len = 0;
875                         }
876                 }
877
878                 if( mbuf != NULL ) {
879                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
880                 }
881                 return mbuf;
882         }
883
884         if( ombuf ) {
885                 ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
886                 ombuf->len = 0;
887         }
888         if( max_wait > 0 ) {
889                 clock_gettime( CLOCK_REALTIME, &ts );   // sem timeout based on clock, not a delta
890
891                 if( max_wait > 999 ) {
892                         seconds = max_wait / 1000;
893                         max_wait -= seconds * 1000;
894                         ts.tv_sec += seconds;
895                 }
896                 if( max_wait > 0 ) {
897                         nano_sec = max_wait * 1000000;
898                         ts.tv_nsec += nano_sec;
899                         if( ts.tv_nsec > 999999999 ) {
900                                 ts.tv_nsec -= 999999999;
901                                 ts.tv_sec++;
902                         }
903                 }
904
905                 seconds = 1;                                                                                                    // use as flag later to invoked timed wait
906         }
907
908         errno = EINTR;
909         state = -1;
910         while( state < 0 && errno == EINTR ) {
911                 if( seconds ) {
912                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
913                 } else {
914                         state = sem_wait( &chute->barrier );
915                 }
916         }
917
918         if( state < 0 ) {
919                 mbuf = ombuf;                           // return caller's buffer if they passed one in
920         } else {
921                 errno = 0;                                              // interrupted call state could be left; clear
922                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
923                 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
924                         mbuf->state = RMR_OK;
925                         mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
926
927                         if( ombuf ) {
928                                 rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
929                         }
930                 } else {
931                         errno = ETIMEDOUT;
932                         mbuf = ombuf;                           // no buffer, return user's if there
933                 }
934         }
935
936         if( mbuf ) {
937                 mbuf->tp_state = errno;
938         }
939         return mbuf;
940 }
941
942
943
944
945 /*
946         This is the work horse for the multi-threaded call() function. It supports
947         both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
948         for for rmr_mt_call() modulo the caveat below.
949
950         If endpoint is given, then we assume that we're not doing normal route table
951         routing and that we should send directly to that endpoint (probably worm
952         hole).
953 */
954 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
955         rmr_mbuf_t* ombuf;                      // original mbuf passed in
956         uta_ctx_t*      ctx;
957         uta_mhdr_t*     hdr;                    // header in the transport buffer
958         chute_t*        chute;
959         unsigned char*  d1;                     // d1 data in header
960         struct timespec ts;                     // time info if we have a timeout
961         long    new_ms;                         // adjusted mu-sec
962         long    seconds = 0;            // max wait seconds
963         long    nano_sec;                       // max wait xlated to nano seconds
964         int             state;
965
966         errno = EINVAL;
967         if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
968                 if( mbuf ) {
969                         mbuf->tp_state = errno;
970                         mbuf->state = RMR_ERR_BADARG;
971                 }
972                 return mbuf;
973         }
974
975         if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
976                 mbuf->state = RMR_ERR_NOTSUPP;
977                 mbuf->tp_state = errno;
978                 return mbuf;
979         }
980
981         if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
982                 mbuf->state = RMR_ERR_BADARG;
983                 mbuf->tp_state = errno;
984                 return mbuf;
985         }
986
987         ombuf = mbuf;                                                                                                   // save to return timeout status with
988
989         chute = &ctx->chutes[call_id];
990         if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
991                 rmr_free_msg( chute->mbuf );
992                 chute->mbuf = NULL;
993         }
994
995         hdr = (uta_mhdr_t *) mbuf->header;
996         hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
997         memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
998         d1 = DATA1_ADDR( hdr );
999         d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
1000         mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
1001
1002         if( max_wait >= 0 ) {
1003                 clock_gettime( CLOCK_REALTIME, &ts );
1004
1005                 if( max_wait > 999 ) {
1006                         seconds = max_wait / 1000;
1007                         max_wait -= seconds * 1000;
1008                         ts.tv_sec += seconds;
1009                 }
1010                 if( max_wait > 0 ) {
1011                         nano_sec = max_wait * 1000000;
1012                         ts.tv_nsec += nano_sec;
1013                         if( ts.tv_nsec > 999999999 ) {
1014                                 ts.tv_nsec -= 999999999;
1015                                 ts.tv_sec++;
1016                         }
1017                 }
1018
1019                 seconds = 1;                                                                            // use as flag later to invoked timed wait
1020         }
1021
1022         if( ep == NULL ) {                                                                              // normal routing
1023                 mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
1024         } else {
1025                 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1026         }
1027         if( mbuf ) {
1028                 if( mbuf->state != RMR_OK ) {
1029                         mbuf->tp_state = errno;
1030                         return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
1031                 }
1032         }
1033
1034         state = 0;
1035         errno = 0;
1036         while( chute->mbuf == NULL && ! errno ) {
1037                 if( seconds ) {
1038                         state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
1039                 } else {
1040                         state = sem_wait( &chute->barrier );
1041                 }
1042
1043                 if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
1044                         errno = 0;
1045                 }
1046
1047                 if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
1048                         if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1049                                 rmr_free_msg( chute->mbuf );
1050                                 chute->mbuf = NULL;
1051                                 errno = 0;
1052                         }
1053                 }
1054         }
1055
1056         if( state < 0 ) {
1057                 return NULL;                                    // leave errno as set by sem wait call
1058         }
1059
1060         mbuf = chute->mbuf;
1061         if( mbuf != NULL ) {
1062                 mbuf->state = RMR_OK;
1063         }
1064         chute->mbuf = NULL;
1065
1066         return mbuf;
1067 }
1068
1069 /*
1070         Accept a message buffer and caller ID, send the message and then wait
1071         for the receiver to tickle the semaphore letting us know that a message
1072         has been received. The call_id is a value between 2 and 255, inclusive; if
1073         it's not in this range an error will be returned. Max wait is the amount
1074         of time in millaseconds that the call should block for. If 0 is given
1075         then no timeout is set.
1076
1077         If the mt_call feature has not been initialised, then the attempt to use this
1078         funciton will fail with RMR_ERR_NOTSUPP
1079
1080         If no matching message is received before the max_wait period expires, a
1081         nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1082         occurs after the message has been sent, then a nil pointer is returned
1083         with errno set to some other value.
1084
1085         This is now just an outward facing wrapper so we can support wormhole calls.
1086 */
1087 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1088         return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1089 }
1090
1091
1092 /*
1093         Given an existing message buffer, reallocate the payload portion to
1094         be at least new_len bytes.  The message header will remain such that
1095         the caller may use the rmr_rts_msg() function to return a payload
1096         to the sender.
1097
1098         The mbuf passed in may or may not be reallocated and the caller must
1099         use the returned pointer and should NOT assume that it can use the
1100         pointer passed in with the exceptions based on the clone flag.
1101
1102         If the clone flag is set, then a duplicated message, with larger payload
1103         size, is allocated and returned.  The old_msg pointer in this situation is
1104         still valid and must be explicitly freed by the application. If the clone
1105         message is not set (0), then any memory management of the old message is
1106         handled by the function.
1107
1108         If the copy flag is set, the contents of the old message's payload is
1109         copied to the reallocated payload.  If the flag is not set, then the
1110         contents of the payload is undetermined.
1111 */
1112 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1113         if( old_msg == NULL ) {
1114                 return NULL;
1115         }
1116
1117         return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
1118 }
1119
1120 /*
1121         Enable low latency things in the transport (when supported).
1122 */
1123 extern void rmr_set_low_latency( void* vctx ) {
1124         uta_ctx_t*      ctx;
1125
1126         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1127                 if( ctx->si_ctx != NULL ) {
1128                         SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1129                 }
1130         }
1131 }
1132
1133 /*
1134         Turn on fast acks.
1135 */
1136 extern void rmr_set_fack( void* vctx ) {
1137         uta_ctx_t*      ctx;
1138
1139         if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1140                 if( ctx->si_ctx != NULL ) {
1141                         SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );
1142                 }
1143         }
1144 }
1145