1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
59 #define SI95_BUILD 1 // we drop some common functions for si
61 #include "rmr.h" // things the users see
62 #include "rmr_agnostic.h" // agnostic things (must be included before private)
63 #include "rmr_si_private.h" // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
67 #include "ring_static.c" // message ring support
68 #include "rt_generic_static.c" // route table things not transport specific
69 #include "rtable_si_static.c" // route table things -- transport specific
70 #include "rtc_static.c" // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c" // send/receive static functions
73 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
78 //------------------------------------------------------------------------------
84 static void free_ctx( uta_ctx_t* ctx ) {
85 if( ctx && ctx->rtg_addr ) {
86 free( ctx->rtg_addr );
90 // --------------- public functions --------------------------------------------------------------------------
93 Returns the size of the payload (bytes) that the msg buffer references.
94 Len in a message is the number of bytes which were received, or should
95 be transmitted, however, it is possible that the mbuf was allocated
96 with a larger payload space than the payload length indicates; this
97 function returns the absolute maximum space that the user has available
98 in the payload. On error (bad msg buffer) -1 is returned and errno should
101 The allocated len stored in the msg is:
102 transport header length +
104 user requested payload
106 The msg header is a combination of the fixed RMR header and the variable
107 trace data and d2 fields which may vary for each message.
109 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
110 if( msg == NULL || msg->header == NULL ) {
116 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
120 Allocates a send message as a zerocopy message allowing the underlying message protocol
121 to send the buffer without copy.
123 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
127 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
131 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
137 Allocates a send message as a zerocopy message allowing the underlying message protocol
138 to send the buffer without copy. In addition, a trace data field of tr_size will be
139 added and the supplied data coppied to the buffer before returning the message to
142 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
147 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
151 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
153 state = rmr_set_trace( m, data, tr_size ); // roll their data in
154 if( state != tr_size ) {
155 m->state = RMR_ERR_INITFAILED;
163 This provides an external path to the realloc static function as it's called by an
164 outward facing mbuf api function. Used to reallocate a message with a different
167 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
168 return realloc_msg( msg, new_tr_size );
173 Return the message to the available pool, or free it outright.
175 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
180 if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
181 ! mbuf->ring || // cant cache if no ring
182 ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
185 free( mbuf->tp_buf );
186 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
189 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
195 This is a wrapper to the real timeout send. We must wrap it now to ensure that
196 the call flag and call-id are reset
198 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
199 char* d1; // point at the call-id in the header
202 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
204 d1 = DATA1_ADDR( msg->header );
205 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
208 return mtosend_msg( vctx, msg, max_to );
212 Send with default max timeout as is set in the context.
213 See rmr_mtosend_msg() for more details on the parameters.
214 See rmr_stimeout() for info on setting the default timeout.
216 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
217 char* d1; // point at the call-id in the header
220 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
222 d1 = DATA1_ADDR( msg->header );
223 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
226 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
230 Return to sender allows a message to be sent back to the endpoint where it originated.
232 With SI95 it was thought that the return to sender would be along the same open conneciton
233 and thus no table lookup would be needed to open a 'reverse direction' path. However, for
234 applications sending at high message rates, returning responses on the same connection
235 causes major strife. Thus the decision was made to use the same method as NNG and just
236 open a second connection for reverse path.
238 We will attempt to use the name in the received message to look up the endpoint. If
239 that failes, then we will write on the connection that the message arrived on as a
242 On success (state is RMR_OK, the caller may use the buffer for another receive operation),
243 and on error it can be passed back to this function to retry the send if desired. On error,
244 errno will liklely have the failure reason set by the nng send processing. The following
245 are possible values for the state in the message buffer:
247 Message states returned:
248 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
249 RMR_ERR_NOHDR - message did not have a header
250 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
251 RMR_ERR_SENDFAILED - send failed; errno has nano error code
252 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
254 A nil message as the return value is rare, and generally indicates some kind of horrible
255 failure. The value of errno might give a clue as to what is wrong.
258 Like send_msg(), this is non-blocking and will return the msg if there is an error.
259 The caller must check for this and handle it properly.
261 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
262 int nn_sock; // endpoint socket for send
264 char* hold_src; // we need the original source if send fails
265 char* hold_ip; // also must hold original ip
266 int sock_ok = 0; // true if we found a valid endpoint socket
267 endpoint_t* ep = NULL; // end point to track counts
269 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
270 errno = EINVAL; // if msg is null, this is their clue
272 msg->state = RMR_ERR_BADARG;
273 msg->tp_state = errno;
278 errno = 0; // at this point any bad state is in msg returned
279 if( msg->header == NULL ) {
280 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
281 msg->state = RMR_ERR_NOHDR;
282 msg->tp_state = errno;
286 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
288 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
290 if( (nn_sock = msg->rts_fd) < 0 ) {
291 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
292 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
295 msg->state = RMR_ERR_NOENDPT;
302 msg->state = RMR_OK; // ensure it is clear before send
303 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
304 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
305 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
306 msg = send_msg( ctx, msg, nn_sock, -1 );
309 switch( msg->state ) {
311 ep->scounts[EPSC_GOOD]++;
315 ep->scounts[EPSC_TRANS]++;
319 // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
320 ep->scounts[EPSC_FAIL]++;
324 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
325 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
326 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
335 If multi-threading call is turned on, this invokes that mechanism with the special call
336 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
337 behavour (described below) is carried out. This is safe to use when mt is enabled, but
338 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
341 On timeout this function will return a nil pointer. If the original message could not
342 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
345 Call sends the message based on message routing using the message type, and waits for a
346 response message to arrive with the same transaction id that was in the outgoing message.
347 If, while wiating for the expected response, messages are received which do not have the
348 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
349 order that they were received.
351 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
352 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
353 may be resent (likely the context pointer was nil). If the message is sent, but no
354 response is received, a nil message is returned with errno set to indicate the likley
356 ETIMEDOUT -- too many messages were queued before reciving the expected response
357 ENOBUFS -- the queued message ring is full, messages were dropped
358 EINVAL -- A parameter was not valid
359 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
360 user should call this function with the message again.
363 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
366 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
368 msg->state = RMR_ERR_BADARG;
373 return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec
377 The outward facing receive function. When invoked it will pop the oldest message
378 from the receive ring, if any are queued, and return it. If the ring is empty
379 then the receive function is invoked to wait for the next message to arrive (blocking).
381 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
382 nil, a new one will be allocated. However, the caller should NOT expect to get the same
383 struct back (if a queued message is returned the message struct will be different).
385 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
387 rmr_mbuf_t* qm; // message that was queued on the ring
389 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
391 if( old_msg != NULL ) {
392 old_msg->state = RMR_ERR_BADARG;
393 old_msg->tp_state = errno;
399 return rmr_mt_rcv( ctx, old_msg, -1 );
403 This allows a timeout based receive for applications unable to implement epoll_wait()
406 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
409 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
411 if( old_msg != NULL ) {
412 old_msg->state = RMR_ERR_BADARG;
413 old_msg->tp_state = errno;
418 return rmr_mt_rcv( ctx, old_msg, ms_to );
422 DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
423 too. This function likely will not behave as expected in SI, and we are pretty sure it
424 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
426 This blocks until the message with the 'expect' ID is received. Messages which are received
427 before the expected message are queued onto the message ring. The function will return
428 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
429 expected message is received. If the queued message ring fills a nil pointer is returned
430 and errno is set to ENOBUFS.
432 Generally this will be invoked only by the call() function as it waits for a response, but
433 it is exposed to the user application as three is no reason not to.
435 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
437 int queued = 0; // number we pushed into the ring
438 int exp_len = 0; // length of expected ID
440 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
443 msg->state = RMR_ERR_BADARG;
444 msg->tp_state = errno;
451 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
452 return rmr_rcv_msg( ctx, msg );
455 exp_len = strlen( expect );
456 if( exp_len > RMR_MAX_XID ) {
457 exp_len = RMR_MAX_XID;
459 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
461 while( queued < allow2queue ) {
462 msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
464 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
465 if( msg->state == RMR_OK ) {
466 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
467 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
471 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
472 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
477 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
484 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
490 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
491 _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
492 mechnism indicates eagain or etimeedout. All other error conditions are reported
493 without this delay. Setting a timeout of 0 causes no retries to be attempted in
494 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
495 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
496 after every 1K send attempts until the "time" value is reached. Retries are abandoned
497 if NNG returns anything other than EAGAIN or EINTER is returned.
499 The default, if this function is not used, is 1; meaning that RMr will retry, but will
500 not enter a sleep. In all cases the caller should check the status in the message returned
503 Returns -1 if the context was invalid; RMR_OK otherwise.
505 extern int rmr_set_stimeout( void* vctx, int time ) {
508 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
516 ctx->send_retries = time;
521 Set receive timeout -- not supported in nng implementation
523 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
525 extern int rmr_set_rtimeout( void* vctx, int time ) {
526 rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
532 This is the actual init workhorse. The user visible function meerly ensures that the
533 calling programme does NOT set any internal flags that are supported, and then
534 invokes this. Internal functions (the route table collector) which need additional
535 open ports without starting additional route table collectors, will invoke this
536 directly with the proper flag.
538 CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
539 that we know about. The _user_ should ensure that the supplied length also
540 includes the trace data length maximum as they are in control of that.
542 static void* init( char* uproto_port, int def_msg_size, int flags ) {
543 static int announced = 0;
544 uta_ctx_t* ctx = NULL;
545 char bind_info[256]; // bind info
546 char* proto = "tcp"; // pointer into the proto/port string user supplied
548 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
550 char wbuf[1024]; // work buffer
551 char* tok; // pointer at token in a buffer
553 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
558 old_vlevel = rmr_vlog_init(); // initialise and get the current level
559 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
562 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
563 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
566 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
569 if( uproto_port == NULL ) {
570 proto_port = strdup( DEF_COMM_PORT );
572 proto_port = strdup( uproto_port ); // so we can modify it
575 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
579 memset( ctx, 0, sizeof( uta_ctx_t ) );
581 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
582 ctx->nrivers = 256; // number of input flows we'll manage
583 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
584 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
585 for( i = 0; i < ctx->nrivers; i++ ) {
586 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
589 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
590 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
591 ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt
592 ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
594 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
595 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
597 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
598 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
599 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
601 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
603 init_mtcall( ctx ); // set up call chutes
604 fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
607 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
608 if( def_msg_size > 0 ) {
609 ctx->max_plen = def_msg_size;
612 // we're using a listener to get rtg updates, so we do NOT need this.
613 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
615 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
616 if( ctx->si_ctx == NULL ) {
617 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
622 if( (port = strchr( proto_port, ':' )) != NULL ) {
623 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
626 *(port++) = 0; // term proto string and point at port string
627 proto = proto_port; // user supplied proto so point at it rather than default
630 port = proto_port; // assume something like "1234" was passed
633 if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
637 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
638 tok = strdup( tok ); // something we can destroy
639 if( *tok == '[' ) { // we allow an ipv6 address here
640 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
642 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
644 if( tok2 && *tok2 ) { // if it's not the end of string marker
645 *tok2 = 0; // make it so
648 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
651 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
652 rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
655 if( (tok = strchr( wbuf, '.' )) != NULL ) {
656 *tok = 0; // we don't keep domain portion
660 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
661 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
662 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
666 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
667 if( atoi( tok ) > 0 ) {
668 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
672 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
673 if( flags & RMRFL_NAME_ONLY ) {
674 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
676 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
677 if( ctx->my_ip == NULL ) {
678 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
679 ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
682 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
684 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
686 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
691 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
692 interface = "0.0.0.0";
695 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
696 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
697 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
702 // finish all flag setting before threads to keep helgrind quiet
703 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
705 if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself)
706 ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
709 rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
710 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
711 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
714 rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
715 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
716 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
721 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
722 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
730 Initialise the message routing environment. Flags are one of the UTAFL_
731 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
732 (tcp) to be used, then :port is all that is needed.
734 At the moment it seems that TCP really is the only viable protocol, but
735 we'll allow flexibility.
737 The return value is a void pointer which must be passed to most uta functions. On
738 error, a nil pointer is returned and errno should be set.
741 No user flags supported (needed) at the moment, but this provides for extension
742 without drastically changing anything. The user should invoke with RMRFL_NONE to
743 avoid any misbehavour as there are internal flags which are suported
745 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
746 return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
750 This sets the default trace length which will be added to any message buffers
751 allocated. It can be set at any time, and if rmr_set_trace() is given a
752 trace len that is different than the default allcoated in a message, the message
755 Returns 0 on failure and 1 on success. If failure, then errno will be set.
757 extern int rmr_init_trace( void* vctx, int tr_len ) {
761 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
766 ctx->trace_data_len = tr_len;
771 Return true if routing table is initialised etc. and app can send/receive.
773 extern int rmr_ready( void* vctx ) {
776 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
780 if( ctx->rtable != NULL ) {
788 This returns the message queue ring's filedescriptor which can be used for
789 calls to epoll. The user shouild NOT read, write, or close the fd.
791 Returns the file descriptor or -1 on error.
793 extern int rmr_get_rcvfd( void* vctx ) {
797 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
802 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
803 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
808 return uta_ring_getpfd( ctx->mring );
815 There isn't an si_flush() per se, but we can pause, generate
816 a context switch, which should allow the last sent buffer to
817 flow. There isn't exactly an nng_term/close either, so there
818 isn't much we can do.
820 extern void rmr_close( void* vctx ) {
823 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
829 SItp_stats( ctx->si_ctx ); // dump some interesting stats
831 // FIX ME -- how to we turn off si; close all sessions etc?
832 //SIclose( ctx->nn_sock );
837 // ----- multi-threaded call/receive support -------------------------------------------------
840 Blocks on the receive ring chute semaphore and then reads from the ring
841 when it is tickled. If max_wait is -1 then the function blocks until
842 a message is ready on the ring. Else max_wait is assumed to be the number
843 of millaseconds to wait before returning a timeout message.
845 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
848 struct timespec ts; // time info if we have a timeout
849 long new_ms; // adjusted mu-sec
850 long seconds = 0; // max wait seconds
851 long nano_sec; // max wait xlated to nano seconds
853 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
855 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
858 mbuf->state = RMR_ERR_BADARG;
859 mbuf->tp_state = errno;
864 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
866 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
868 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
869 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
871 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
874 mbuf = ombuf; // return original if it was given with timeout status
875 if( ombuf != NULL ) {
876 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
882 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
889 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
893 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
895 if( max_wait > 999 ) {
896 seconds = max_wait / 1000;
897 max_wait -= seconds * 1000;
898 ts.tv_sec += seconds;
901 nano_sec = max_wait * 1000000;
902 ts.tv_nsec += nano_sec;
903 if( ts.tv_nsec > 999999999 ) {
904 ts.tv_nsec -= 999999999;
909 seconds = 1; // use as flag later to invoked timed wait
914 while( state < 0 && errno == EINTR ) {
916 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
918 state = sem_wait( &chute->barrier );
923 mbuf = ombuf; // return caller's buffer if they passed one in
925 errno = 0; // interrupted call state could be left; clear
926 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
927 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
928 mbuf->state = RMR_OK;
929 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
932 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
936 mbuf = ombuf; // no buffer, return user's if there
941 mbuf->tp_state = errno;
950 This is the work horse for the multi-threaded call() function. It supports
951 both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
952 for for rmr_mt_call() modulo the caveat below.
954 If endpoint is given, then we assume that we're not doing normal route table
955 routing and that we should send directly to that endpoint (probably worm
958 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
959 rmr_mbuf_t* ombuf; // original mbuf passed in
961 uta_mhdr_t* hdr; // header in the transport buffer
963 unsigned char* d1; // d1 data in header
964 struct timespec ts; // time info if we have a timeout
965 long new_ms; // adjusted mu-sec
966 long seconds = 0; // max wait seconds
967 long nano_sec; // max wait xlated to nano seconds
971 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
973 mbuf->tp_state = errno;
974 mbuf->state = RMR_ERR_BADARG;
979 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
980 mbuf->state = RMR_ERR_NOTSUPP;
981 mbuf->tp_state = errno;
985 ombuf = mbuf; // save to return timeout status with
987 chute = &ctx->chutes[call_id];
988 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
989 rmr_free_msg( chute->mbuf );
993 hdr = (uta_mhdr_t *) mbuf->header;
994 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
995 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
996 d1 = DATA1_ADDR( hdr );
997 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
998 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1000 if( max_wait >= 0 ) {
1001 clock_gettime( CLOCK_REALTIME, &ts );
1003 if( max_wait > 999 ) {
1004 seconds = max_wait / 1000;
1005 max_wait -= seconds * 1000;
1006 ts.tv_sec += seconds;
1008 if( max_wait > 0 ) {
1009 nano_sec = max_wait * 1000000;
1010 ts.tv_nsec += nano_sec;
1011 if( ts.tv_nsec > 999999999 ) {
1012 ts.tv_nsec -= 999999999;
1017 seconds = 1; // use as flag later to invoked timed wait
1020 if( ep == NULL ) { // normal routing
1021 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1023 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1026 if( mbuf->state != RMR_OK ) {
1027 mbuf->tp_state = errno;
1028 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1034 while( chute->mbuf == NULL && ! errno ) {
1036 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1038 state = sem_wait( &chute->barrier );
1041 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1045 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1046 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1047 rmr_free_msg( chute->mbuf );
1055 return NULL; // leave errno as set by sem wait call
1059 if( mbuf != NULL ) {
1060 mbuf->state = RMR_OK;
1068 Accept a message buffer and caller ID, send the message and then wait
1069 for the receiver to tickle the semaphore letting us know that a message
1070 has been received. The call_id is a value between 2 and 255, inclusive; if
1071 it's not in this range an error will be returned. Max wait is the amount
1072 of time in millaseconds that the call should block for. If 0 is given
1073 then no timeout is set.
1075 If the mt_call feature has not been initialised, then the attempt to use this
1076 funciton will fail with RMR_ERR_NOTSUPP
1078 If no matching message is received before the max_wait period expires, a
1079 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1080 occurs after the message has been sent, then a nil pointer is returned
1081 with errno set to some other value.
1083 This is now just an outward facing wrapper so we can support wormhole calls.
1085 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1087 // must vet call_id here, all others vetted by workhorse mt_call() function
1088 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
1089 if( mbuf != NULL ) {
1090 mbuf->state = RMR_ERR_BADARG;
1091 mbuf->tp_state = EINVAL;
1096 return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1101 Given an existing message buffer, reallocate the payload portion to
1102 be at least new_len bytes. The message header will remain such that
1103 the caller may use the rmr_rts_msg() function to return a payload
1106 The mbuf passed in may or may not be reallocated and the caller must
1107 use the returned pointer and should NOT assume that it can use the
1108 pointer passed in with the exceptions based on the clone flag.
1110 If the clone flag is set, then a duplicated message, with larger payload
1111 size, is allocated and returned. The old_msg pointer in this situation is
1112 still valid and must be explicitly freed by the application. If the clone
1113 message is not set (0), then any memory management of the old message is
1114 handled by the function.
1116 If the copy flag is set, the contents of the old message's payload is
1117 copied to the reallocated payload. If the flag is not set, then the
1118 contents of the payload is undetermined.
1120 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1121 if( old_msg == NULL ) {
1125 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1129 Enable low latency things in the transport (when supported).
1131 extern void rmr_set_low_latency( void* vctx ) {
1134 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1135 if( ctx->si_ctx != NULL ) {
1136 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1144 extern void rmr_set_fack( void* vctx ) {
1147 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1148 if( ctx->si_ctx != NULL ) {
1149 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );