1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2021 Nokia
5 Copyright (c) 2018-2021 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
60 #define SI95_BUILD 1 // we drop some common functions for si
62 #include "rmr.h" // things the users see
63 #include "rmr_agnostic.h" // agnostic things (must be included before private)
64 #include "rmr_si_private.h" // things that we need too
66 #include "rmr_symtab.h"
67 #include "rmr_logging.h"
69 #include "ring_static.c" // message ring support
70 #include "rt_generic_static.c" // route table things not transport specific
71 #include "rtable_si_static.c" // route table things -- transport specific
73 #include "rtc_static.c" // route table collector (thread code)
74 #include "tools_static.c"
75 #include "sr_si_static.c" // send/receive static functions
76 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
77 #include "mt_call_static.c"
78 #include "mt_call_si_static.c"
79 #include "rmr_debug_si.c" // debuging functions
82 //------------------------------------------------------------------------------
85 If we have an EP, up the counters based on state.
86 This isn't needed, but it makes driving the code under unit test easier so we
87 induldge in the bean counter's desire for coverage numbers.
89 static inline void incr_ep_counts( int state, endpoint_t* ep ) {
93 ep->scounts[EPSC_GOOD]++;
97 ep->scounts[EPSC_TRANS]++;
101 ep->scounts[EPSC_FAIL]++;
110 static void free_ctx( uta_ctx_t* ctx ) {
113 free( ctx->rtg_addr );
115 uta_ring_free( ctx->mring );
116 uta_ring_free( ctx->zcb_mring );
121 rmr_sym_free( ctx->fd2ep );
124 free( ctx->my_name );
130 rmr_sym_free( ctx->rtable->hash );
140 // --------------- public functions --------------------------------------------------------------------------
143 Returns the size of the payload (bytes) that the msg buffer references.
144 Len in a message is the number of bytes which were received, or should
145 be transmitted, however, it is possible that the mbuf was allocated
146 with a larger payload space than the payload length indicates; this
147 function returns the absolute maximum space that the user has available
148 in the payload. On error (bad msg buffer) -1 is returned and errno should
151 The allocated len stored in the msg is:
152 transport header length +
154 user requested payload
156 The msg header is a combination of the fixed RMR header and the variable
157 trace data and d2 fields which may vary for each message.
159 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
160 if( msg == NULL || msg->header == NULL ) {
166 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
170 Allocates a send message as a zerocopy message allowing the underlying message protocol
171 to send the buffer without copy.
173 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
177 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
181 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
187 Allocates a send message as a zerocopy message allowing the underlying message protocol
188 to send the buffer without copy. In addition, a trace data field of tr_size will be
189 added and the supplied data coppied to the buffer before returning the message to
192 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
197 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
201 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
203 state = rmr_set_trace( m, data, tr_size ); // roll their data in
204 if( state != tr_size ) {
205 m->state = RMR_ERR_INITFAILED;
213 This provides an external path to the realloc static function as it's called by an
214 outward facing mbuf api function. Used to reallocate a message with a different
217 User programmes must use this with CAUTION! The mbuf passed in is NOT freed and
218 is still valid following this call. The caller is reponsible for maintainting
219 a pointer to both old and new messages and invoking rmr_free_msg() on both!
221 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
222 return realloc_msg( msg, new_tr_size );
227 Return the message to the available pool, or free it outright.
229 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
231 fprintf( stderr, ">>>FREE nil buffer\n" );
236 if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
237 ! mbuf->ring || // cant cache if no ring
238 ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
241 free( mbuf->tp_buf );
242 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
245 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
249 // always free, never manage a pool
251 free( mbuf->tp_buf );
252 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
255 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
261 This is a wrapper to the real timeout send. We must wrap it now to ensure that
262 the call flag and call-id are reset
264 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
265 char* d1; // point at the call-id in the header
268 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
270 d1 = DATA1_ADDR( msg->header );
271 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
274 return mtosend_msg( vctx, msg, max_to );
278 Send with default max timeout as is set in the context.
279 See rmr_mtosend_msg() for more details on the parameters.
280 See rmr_stimeout() for info on setting the default timeout.
282 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
283 char* d1; // point at the call-id in the header
286 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
288 d1 = DATA1_ADDR( msg->header );
289 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
292 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
296 Return to sender allows a message to be sent back to the endpoint where it originated.
298 With SI95 it was thought that the return to sender would be along the same open conneciton
299 and thus no table lookup would be needed to open a 'reverse direction' path. However, for
300 applications sending at high message rates, returning responses on the same connection
301 causes major strife. Thus the decision was made to use the same method as NNG and just
302 open a second connection for reverse path.
304 We will attempt to use the name in the received message to look up the endpoint. If
305 that failes, then we will write on the connection that the message arrived on as a
308 On success (state is RMR_OK, the caller may use the buffer for another receive operation),
309 and on error it can be passed back to this function to retry the send if desired. On error,
310 errno will liklely have the failure reason set by the nng send processing. The following
311 are possible values for the state in the message buffer:
313 Message states returned:
314 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
315 RMR_ERR_NOHDR - message did not have a header
316 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
317 RMR_ERR_SENDFAILED - send failed; errno has nano error code
318 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
320 A nil message as the return value is rare, and generally indicates some kind of horrible
321 failure. The value of errno might give a clue as to what is wrong.
324 Like send_msg(), this is non-blocking and will return the msg if there is an error.
325 The caller must check for this and handle it properly.
327 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
328 int nn_sock; // endpoint socket for send
330 char* hold_src; // we need the original source if send fails
331 char* hold_ip; // also must hold original ip
332 int sock_ok = 0; // true if we found a valid endpoint socket
333 endpoint_t* ep = NULL; // end point to track counts
335 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
336 errno = EINVAL; // if msg is null, this is their clue
338 msg->state = RMR_ERR_BADARG;
339 msg->tp_state = errno;
344 errno = 0; // at this point any bad state is in msg returned
345 if( msg->header == NULL ) {
346 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
347 msg->state = RMR_ERR_NOHDR;
348 msg->tp_state = errno;
352 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
354 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
356 if( (nn_sock = msg->rts_fd) < 0 ) {
357 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
358 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
361 msg->state = RMR_ERR_NOENDPT;
367 msg->state = RMR_OK; // ensure it is clear before send
368 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
369 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
370 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
371 msg = send_msg( ctx, msg, nn_sock, -1 );
373 incr_ep_counts( msg->state, ep ); // update counts
375 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again
376 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
377 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
386 If multi-threading call is turned on, this invokes that mechanism with the special call
387 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
388 behavour (described below) is carried out. This is safe to use when mt is enabled, but
389 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
392 On timeout this function will return a nil pointer. If the original message could not
393 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
396 Call sends the message based on message routing using the message type, and waits for a
397 response message to arrive with the same transaction id that was in the outgoing message.
398 If, while wiating for the expected response, messages are received which do not have the
399 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
400 order that they were received.
402 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
403 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
404 may be resent (likely the context pointer was nil). If the message is sent, but no
405 response is received, a nil message is returned with errno set to indicate the likley
407 ETIMEDOUT -- too many messages were queued before reciving the expected response
408 ENOBUFS -- the queued message ring is full, messages were dropped
409 EINVAL -- A parameter was not valid
410 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
411 user should call this function with the message again.
414 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
417 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
419 msg->state = RMR_ERR_BADARG;
424 return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec
428 The outward facing receive function. When invoked it will pop the oldest message
429 from the receive ring, if any are queued, and return it. If the ring is empty
430 then the receive function is invoked to wait for the next message to arrive (blocking).
432 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
433 nil, a new one will be allocated. However, the caller should NOT expect to get the same
434 struct back (if a queued message is returned the message struct will be different).
436 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
438 rmr_mbuf_t* qm; // message that was queued on the ring
440 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
442 if( old_msg != NULL ) {
443 old_msg->state = RMR_ERR_BADARG;
444 old_msg->tp_state = errno;
450 return rmr_mt_rcv( ctx, old_msg, -1 );
454 This allows a timeout based receive for applications unable to implement epoll_wait()
457 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
460 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
462 if( old_msg != NULL ) {
463 old_msg->state = RMR_ERR_BADARG;
464 old_msg->tp_state = errno;
469 return rmr_mt_rcv( ctx, old_msg, ms_to );
473 DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
474 too. This function likely will not behave as expected in SI, and we are pretty sure it
475 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
477 This blocks until the message with the 'expect' ID is received. Messages which are received
478 before the expected message are queued onto the message ring. The function will return
479 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
480 expected message is received. If the queued message ring fills a nil pointer is returned
481 and errno is set to ENOBUFS.
483 Generally this will be invoked only by the call() function as it waits for a response, but
484 it is exposed to the user application as three is no reason not to.
486 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
488 int queued = 0; // number we pushed into the ring
489 int exp_len = 0; // length of expected ID
491 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
494 msg->state = RMR_ERR_BADARG;
495 msg->tp_state = errno;
502 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
503 return rmr_rcv_msg( ctx, msg );
506 exp_len = strlen( expect );
507 if( exp_len > RMR_MAX_XID ) {
508 exp_len = RMR_MAX_XID;
510 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
512 while( queued < allow2queue ) {
513 msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
515 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
516 if( msg->state == RMR_OK ) {
517 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
518 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
522 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
523 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
528 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
535 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
541 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
542 _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
543 mechnism indicates eagain or etimeedout. All other error conditions are reported
544 without this delay. Setting a timeout of 0 causes no retries to be attempted in
545 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
546 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
547 after every 1K send attempts until the "time" value is reached. Retries are abandoned
548 if NNG returns anything other than EAGAIN or EINTER is returned.
550 The default, if this function is not used, is 1; meaning that RMr will retry, but will
551 not enter a sleep. In all cases the caller should check the status in the message returned
554 Returns -1 if the context was invalid; RMR_OK otherwise.
556 extern int rmr_set_stimeout( void* vctx, int time ) {
559 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
567 ctx->send_retries = time;
572 Set receive timeout -- not supported in nng implementation
574 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
576 extern int rmr_set_rtimeout( void* vctx, int time ) {
577 rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
582 Common cleanup on initialisation error. These are hard to force, and this helps to ensure
583 all code is tested by providing a callable rather than a block of "goto" code.
585 There is a return value so that where we need this we get dinked only for one
586 uncovered line rather than two:
590 That's a hack, and is yet another example of the testing tail wagging the dog.
592 static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
593 if( errval != 0 ) { // if not letting it be what a sysllib set it to...
597 if( port ) { // free things if allocated
604 if( msg ) { // crit message if supplied
605 rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
612 This is the actual init workhorse. The user visible function meerly ensures that the
613 calling programme does NOT set any internal flags that are supported, and then
614 invokes this. Internal functions (the route table collector) which need additional
615 open ports without starting additional route table collectors, will invoke this
616 directly with the proper flag.
618 CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
619 that we know about. The _user_ should ensure that the supplied length also
620 includes the trace data length maximum as they are in control of that.
622 static void* init( char* uproto_port, int def_msg_size, int flags ) {
623 static int announced = 0;
624 uta_ctx_t* ctx = NULL;
625 char bind_info[256]; // bind info
627 char* proto = "tcp"; // pointer into the proto/port string user supplied
628 char* port; // pointer into the proto_port buffer at the port value
629 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
631 char wbuf[1024]; // work buffer
632 char* tok; // pointer at token in a buffer
634 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
639 old_vlevel = rmr_vlog_init(); // initialise and get the current level
642 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
643 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
644 uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
647 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
648 uta_dump_env(); // spit out environment settings meaningful to us if in info mode
652 if( uproto_port == NULL ) {
653 proto_port = strdup( DEF_COMM_PORT );
654 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
656 proto_port = strdup( uproto_port ); // so we can modify it
659 if ( proto_port == NULL ){
660 return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
663 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
664 return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
666 memset( ctx, 0, sizeof( uta_ctx_t ) );
668 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
669 ctx->snarf_rt_fd = -1;
670 ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max
671 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
672 ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed
673 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
674 for( i = 0; i < ctx->nrivers; i++ ) {
675 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
678 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
679 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
680 ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt
681 ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
683 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
684 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
686 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
687 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
688 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
689 uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast
691 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
693 init_mtcall( ctx ); // set up call chutes
694 fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
697 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
698 if( def_msg_size > 0 ) {
699 ctx->max_plen = def_msg_size;
702 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
703 if( ctx->si_ctx == NULL ) {
704 return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
706 SIset_tflags(ctx->si_ctx,SI_TF_QUICK);
708 if( (port = strchr( proto_port, ':' )) != NULL ) {
709 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
712 *(port++) = 0; // term proto string and point at port string
713 proto = proto_port; // user supplied proto so point at it rather than default
716 port = proto_port; // assume something like "1234" was passed
718 rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
720 if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
724 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
725 tok = strdup( tok ); // something we can destroy
726 if( *tok == '[' ) { // we allow an ipv6 address here
727 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
729 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
731 if( tok2 && *tok2 ) { // if it's not the end of string marker
732 *tok2 = 0; // make it so
735 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
738 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
739 return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
741 if( (tok = strchr( wbuf, '.' )) != NULL ) {
742 *tok = 0; // we don't keep domain portion
746 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
747 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
748 return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
751 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
752 if( atoi( tok ) > 0 ) {
753 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
757 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
758 my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
759 if( my_ip == NULL ) {
760 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
761 my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
764 if( flags & RMRFL_NAME_ONLY ) {
765 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
767 ctx->my_ip = strdup( my_ip );
769 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
771 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
773 ctx->flags |= CFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
777 if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all (IPv4, IPv6, or interface name)
779 compares the first ip sussed out by mk_ip_list (returned by get_default_ip)
780 NOTE: this might be not work very predictable in dual-stack where an interface can have IPv4 and IPv6 addresses assigned,
781 meaning that it can select either IPv4 or IPv6 on applications restarts (depends on the order of IP addresses assigned on the interface)
783 if( my_ip[0] == '[' ) { // IPv6
786 interface = "0.0.0.0";
787 if( flags & RMRFL_NAME_ONLY ) { // if hostname is given instead of IP in RMR source address
788 rmr_vlog( RMR_VL_WARN, "rmr_init: hostname:ip is provided as source information for rts() calls, falling back to any IPv4\n" );
793 if( my_ip != NULL ) {
797 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
798 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
799 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
800 return init_err( NULL, ctx, proto_port, 0 );
803 // finish all flag setting before threads to keep helgrind quiet
804 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
807 // ---------------- setup for route table collector before invoking ----------------------------------
808 ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables
809 if( ctx->rtgate != NULL ) {
810 pthread_mutex_init( ctx->rtgate, NULL );
813 ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table
814 if( ctx->ephash == NULL ) {
815 return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
818 ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used
819 if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases
820 ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case
822 ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread
825 rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
826 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
827 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
830 rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
831 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
832 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
837 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
838 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
846 Initialise the message routing environment. Flags are one of the UTAFL_
847 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
848 (tcp) to be used, then :port is all that is needed.
850 At the moment it seems that TCP really is the only viable protocol, but
851 we'll allow flexibility.
853 The return value is a void pointer which must be passed to most uta functions. On
854 error, a nil pointer is returned and errno should be set.
857 No user flags supported (needed) at the moment, but this provides for extension
858 without drastically changing anything. The user should invoke with RMRFL_NONE to
859 avoid any misbehavour as there are internal flags which are suported
861 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
862 return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
866 This sets the default trace length which will be added to any message buffers
867 allocated. It can be set at any time, and if rmr_set_trace() is given a
868 trace len that is different than the default allcoated in a message, the message
871 Returns 0 on failure and 1 on success. If failure, then errno will be set.
873 extern int rmr_init_trace( void* vctx, int tr_len ) {
877 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
882 ctx->trace_data_len = tr_len;
887 Return true if routing table is initialised etc. and app can send/receive.
889 extern int rmr_ready( void* vctx ) {
892 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
896 return ctx->rtable_ready;
900 This returns the message queue ring's filedescriptor which can be used for
901 calls to epoll. The user shouild NOT read, write, or close the fd.
903 Returns the file descriptor or -1 on error.
905 extern int rmr_get_rcvfd( void* vctx ) {
909 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
913 return uta_ring_getpfd( ctx->mring );
920 There isn't an si_flush() per se, but we can pause, generate
921 a context switch, which should allow the last sent buffer to
922 flow. There isn't exactly an nng_term/close either, so there
923 isn't much we can do.
925 extern void rmr_close( void* vctx ) {
928 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
932 if( ctx->seed_rt_fname != NULL ) {
933 free( ctx->seed_rt_fname );
938 SItp_stats( ctx->si_ctx ); // dump some interesting stats
940 // FIX ME -- how to we turn off si; close all sessions etc?
941 //SIclose( ctx->nn_sock );
946 // ----- multi-threaded call/receive support -------------------------------------------------
949 Blocks on the receive ring chute semaphore and then reads from the ring
950 when it is tickled. If max_wait is -1 then the function blocks until
951 a message is ready on the ring. Else max_wait is assumed to be the number
952 of millaseconds to wait before returning a timeout message.
954 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
957 struct timespec ts; // time info if we have a timeout
958 long new_ms; // adjusted mu-sec
959 long seconds = 0; // max wait seconds
960 long nano_sec; // max wait xlated to nano seconds
962 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
964 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
967 mbuf->state = RMR_ERR_BADARG;
968 mbuf->tp_state = errno;
973 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
975 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
977 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
978 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
979 clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time
980 sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop)
982 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
985 mbuf = ombuf; // return original if it was given with timeout status
986 if( ombuf != NULL ) {
987 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
993 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
1000 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
1003 if( max_wait > 0 ) {
1004 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
1006 if( max_wait > 999 ) {
1007 seconds = max_wait / 1000;
1008 max_wait -= seconds * 1000;
1009 ts.tv_sec += seconds;
1011 if( max_wait > 0 ) {
1012 nano_sec = max_wait * 1000000;
1013 ts.tv_nsec += nano_sec;
1014 if( ts.tv_nsec > 999999999 ) {
1015 ts.tv_nsec -= 999999999;
1020 seconds = 1; // use as flag later to invoked timed wait
1025 while( state < 0 && errno == EINTR ) {
1027 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1029 state = sem_wait( &chute->barrier );
1034 mbuf = ombuf; // return caller's buffer if they passed one in
1036 errno = 0; // interrupted call state could be left; clear
1037 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
1038 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
1039 mbuf->state = RMR_OK;
1040 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
1043 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
1047 mbuf = ombuf; // no buffer, return user's if there
1052 mbuf->tp_state = errno;
1061 This is the work horse for the multi-threaded call() function. It supports
1062 both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1063 for for rmr_mt_call() modulo the caveat below.
1065 If endpoint is given, then we assume that we're not doing normal route table
1066 routing and that we should send directly to that endpoint (probably worm
1069 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1070 rmr_mbuf_t* ombuf; // original mbuf passed in
1072 uta_mhdr_t* hdr; // header in the transport buffer
1074 unsigned char* d1; // d1 data in header
1075 struct timespec ts; // time info if we have a timeout
1076 long new_ms; // adjusted mu-sec
1077 long seconds = 0; // max wait seconds
1078 long nano_sec; // max wait xlated to nano seconds
1082 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1084 mbuf->tp_state = errno;
1085 mbuf->state = RMR_ERR_BADARG;
1090 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1091 mbuf->state = RMR_ERR_NOTSUPP;
1092 mbuf->tp_state = errno;
1096 ombuf = mbuf; // save to return timeout status with
1098 chute = &ctx->chutes[call_id];
1099 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
1100 rmr_free_msg( chute->mbuf );
1104 hdr = (uta_mhdr_t *) mbuf->header;
1105 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
1106 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
1107 d1 = DATA1_ADDR( hdr );
1108 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
1109 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1111 if( max_wait >= 0 ) {
1112 clock_gettime( CLOCK_REALTIME, &ts );
1114 if( max_wait > 999 ) {
1115 seconds = max_wait / 1000;
1116 max_wait -= seconds * 1000;
1117 ts.tv_sec += seconds;
1119 if( max_wait > 0 ) {
1120 nano_sec = max_wait * 1000000;
1121 ts.tv_nsec += nano_sec;
1122 if( ts.tv_nsec > 999999999 ) {
1123 ts.tv_nsec -= 999999999;
1128 seconds = 1; // use as flag later to invoked timed wait
1131 if( ep == NULL ) { // normal routing
1132 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1134 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1137 if( mbuf->state != RMR_OK ) {
1138 mbuf->tp_state = errno;
1139 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1145 while( chute->mbuf == NULL && ! errno ) {
1147 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1149 state = sem_wait( &chute->barrier );
1152 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1156 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1157 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1158 rmr_free_msg( chute->mbuf );
1166 return NULL; // leave errno as set by sem wait call
1170 if( mbuf != NULL ) {
1171 mbuf->state = RMR_OK;
1179 Accept a message buffer and caller ID, send the message and then wait
1180 for the receiver to tickle the semaphore letting us know that a message
1181 has been received. The call_id is a value between 2 and 255, inclusive; if
1182 it's not in this range an error will be returned. Max wait is the amount
1183 of time in millaseconds that the call should block for. If 0 is given
1184 then no timeout is set.
1186 If the mt_call feature has not been initialised, then the attempt to use this
1187 funciton will fail with RMR_ERR_NOTSUPP
1189 If no matching message is received before the max_wait period expires, a
1190 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1191 occurs after the message has been sent, then a nil pointer is returned
1192 with errno set to some other value.
1194 This is now just an outward facing wrapper so we can support wormhole calls.
1196 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1198 // must vet call_id here, all others vetted by workhorse mt_call() function
1199 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
1200 if( mbuf != NULL ) {
1201 mbuf->state = RMR_ERR_BADARG;
1202 mbuf->tp_state = EINVAL;
1207 return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1212 Given an existing message buffer, reallocate the payload portion to
1213 be at least new_len bytes. The message header will remain such that
1214 the caller may use the rmr_rts_msg() function to return a payload
1217 The mbuf passed in may or may not be reallocated and the caller must
1218 use the returned pointer and should NOT assume that it can use the
1219 pointer passed in with the exceptions based on the clone flag.
1221 If the clone flag is set, then a duplicated message, with larger payload
1222 size, is allocated and returned. The old_msg pointer in this situation is
1223 still valid and must be explicitly freed by the application. If the clone
1224 message is not set (0), then any memory management of the old message is
1225 handled by the function.
1227 If the copy flag is set, the contents of the old message's payload is
1228 copied to the reallocated payload. If the flag is not set, then the
1229 contents of the payload is undetermined.
1231 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1232 if( old_msg == NULL ) {
1236 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1240 Enable low latency things in the transport (when supported).
1242 extern void rmr_set_low_latency( void* vctx ) {
1245 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1246 if( ctx->si_ctx != NULL ) {
1247 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1255 extern void rmr_set_fack( void* vctx ) {
1258 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1259 if( ctx->si_ctx != NULL ) {
1260 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );