1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
59 #define SI95_BUILD 1 // we drop some common functions for si
61 #include "rmr.h" // things the users see
62 #include "rmr_agnostic.h" // agnostic things (must be included before private)
63 #include "rmr_si_private.h" // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
67 #include "ring_static.c" // message ring support
68 #include "rt_generic_static.c" // route table things not transport specific
69 #include "rtable_si_static.c" // route table things -- transport specific
70 #include "rtc_static.c" // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c" // send/receive static functions
73 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
78 //------------------------------------------------------------------------------
84 static void free_ctx( uta_ctx_t* ctx ) {
87 free( ctx->rtg_addr );
89 uta_ring_free( ctx->mring );
90 uta_ring_free( ctx->zcb_mring );
95 rmr_sym_free( ctx->fd2ep );
104 rmr_sym_free( ctx->rtable->hash );
114 // --------------- public functions --------------------------------------------------------------------------
117 Returns the size of the payload (bytes) that the msg buffer references.
118 Len in a message is the number of bytes which were received, or should
119 be transmitted, however, it is possible that the mbuf was allocated
120 with a larger payload space than the payload length indicates; this
121 function returns the absolute maximum space that the user has available
122 in the payload. On error (bad msg buffer) -1 is returned and errno should
125 The allocated len stored in the msg is:
126 transport header length +
128 user requested payload
130 The msg header is a combination of the fixed RMR header and the variable
131 trace data and d2 fields which may vary for each message.
133 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
134 if( msg == NULL || msg->header == NULL ) {
140 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
144 Allocates a send message as a zerocopy message allowing the underlying message protocol
145 to send the buffer without copy.
147 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
151 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
155 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
161 Allocates a send message as a zerocopy message allowing the underlying message protocol
162 to send the buffer without copy. In addition, a trace data field of tr_size will be
163 added and the supplied data coppied to the buffer before returning the message to
166 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
171 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
175 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
177 state = rmr_set_trace( m, data, tr_size ); // roll their data in
178 if( state != tr_size ) {
179 m->state = RMR_ERR_INITFAILED;
187 This provides an external path to the realloc static function as it's called by an
188 outward facing mbuf api function. Used to reallocate a message with a different
191 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
192 return realloc_msg( msg, new_tr_size );
197 Return the message to the available pool, or free it outright.
199 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
204 if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
205 ! mbuf->ring || // cant cache if no ring
206 ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
209 free( mbuf->tp_buf );
210 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
213 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
219 This is a wrapper to the real timeout send. We must wrap it now to ensure that
220 the call flag and call-id are reset
222 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
223 char* d1; // point at the call-id in the header
226 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
228 d1 = DATA1_ADDR( msg->header );
229 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
232 return mtosend_msg( vctx, msg, max_to );
236 Send with default max timeout as is set in the context.
237 See rmr_mtosend_msg() for more details on the parameters.
238 See rmr_stimeout() for info on setting the default timeout.
240 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
241 char* d1; // point at the call-id in the header
244 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
246 d1 = DATA1_ADDR( msg->header );
247 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
250 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
254 Return to sender allows a message to be sent back to the endpoint where it originated.
256 With SI95 it was thought that the return to sender would be along the same open conneciton
257 and thus no table lookup would be needed to open a 'reverse direction' path. However, for
258 applications sending at high message rates, returning responses on the same connection
259 causes major strife. Thus the decision was made to use the same method as NNG and just
260 open a second connection for reverse path.
262 We will attempt to use the name in the received message to look up the endpoint. If
263 that failes, then we will write on the connection that the message arrived on as a
266 On success (state is RMR_OK, the caller may use the buffer for another receive operation),
267 and on error it can be passed back to this function to retry the send if desired. On error,
268 errno will liklely have the failure reason set by the nng send processing. The following
269 are possible values for the state in the message buffer:
271 Message states returned:
272 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
273 RMR_ERR_NOHDR - message did not have a header
274 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
275 RMR_ERR_SENDFAILED - send failed; errno has nano error code
276 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
278 A nil message as the return value is rare, and generally indicates some kind of horrible
279 failure. The value of errno might give a clue as to what is wrong.
282 Like send_msg(), this is non-blocking and will return the msg if there is an error.
283 The caller must check for this and handle it properly.
285 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
286 int nn_sock; // endpoint socket for send
288 char* hold_src; // we need the original source if send fails
289 char* hold_ip; // also must hold original ip
290 int sock_ok = 0; // true if we found a valid endpoint socket
291 endpoint_t* ep = NULL; // end point to track counts
293 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
294 errno = EINVAL; // if msg is null, this is their clue
296 msg->state = RMR_ERR_BADARG;
297 msg->tp_state = errno;
302 errno = 0; // at this point any bad state is in msg returned
303 if( msg->header == NULL ) {
304 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
305 msg->state = RMR_ERR_NOHDR;
306 msg->tp_state = errno;
310 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
312 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
314 if( (nn_sock = msg->rts_fd) < 0 ) {
315 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
316 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
319 msg->state = RMR_ERR_NOENDPT;
326 msg->state = RMR_OK; // ensure it is clear before send
327 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
328 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
329 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
330 msg = send_msg( ctx, msg, nn_sock, -1 );
333 switch( msg->state ) {
335 ep->scounts[EPSC_GOOD]++;
339 ep->scounts[EPSC_TRANS]++;
343 // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
344 ep->scounts[EPSC_FAIL]++;
348 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again
349 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
350 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
359 If multi-threading call is turned on, this invokes that mechanism with the special call
360 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
361 behavour (described below) is carried out. This is safe to use when mt is enabled, but
362 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
365 On timeout this function will return a nil pointer. If the original message could not
366 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
369 Call sends the message based on message routing using the message type, and waits for a
370 response message to arrive with the same transaction id that was in the outgoing message.
371 If, while wiating for the expected response, messages are received which do not have the
372 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
373 order that they were received.
375 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
376 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
377 may be resent (likely the context pointer was nil). If the message is sent, but no
378 response is received, a nil message is returned with errno set to indicate the likley
380 ETIMEDOUT -- too many messages were queued before reciving the expected response
381 ENOBUFS -- the queued message ring is full, messages were dropped
382 EINVAL -- A parameter was not valid
383 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
384 user should call this function with the message again.
387 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
390 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
392 msg->state = RMR_ERR_BADARG;
397 return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec
401 The outward facing receive function. When invoked it will pop the oldest message
402 from the receive ring, if any are queued, and return it. If the ring is empty
403 then the receive function is invoked to wait for the next message to arrive (blocking).
405 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
406 nil, a new one will be allocated. However, the caller should NOT expect to get the same
407 struct back (if a queued message is returned the message struct will be different).
409 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
411 rmr_mbuf_t* qm; // message that was queued on the ring
413 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
415 if( old_msg != NULL ) {
416 old_msg->state = RMR_ERR_BADARG;
417 old_msg->tp_state = errno;
423 return rmr_mt_rcv( ctx, old_msg, -1 );
427 This allows a timeout based receive for applications unable to implement epoll_wait()
430 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
433 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
435 if( old_msg != NULL ) {
436 old_msg->state = RMR_ERR_BADARG;
437 old_msg->tp_state = errno;
442 return rmr_mt_rcv( ctx, old_msg, ms_to );
446 DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
447 too. This function likely will not behave as expected in SI, and we are pretty sure it
448 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
450 This blocks until the message with the 'expect' ID is received. Messages which are received
451 before the expected message are queued onto the message ring. The function will return
452 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
453 expected message is received. If the queued message ring fills a nil pointer is returned
454 and errno is set to ENOBUFS.
456 Generally this will be invoked only by the call() function as it waits for a response, but
457 it is exposed to the user application as three is no reason not to.
459 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
461 int queued = 0; // number we pushed into the ring
462 int exp_len = 0; // length of expected ID
464 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
467 msg->state = RMR_ERR_BADARG;
468 msg->tp_state = errno;
475 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
476 return rmr_rcv_msg( ctx, msg );
479 exp_len = strlen( expect );
480 if( exp_len > RMR_MAX_XID ) {
481 exp_len = RMR_MAX_XID;
483 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
485 while( queued < allow2queue ) {
486 msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
488 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
489 if( msg->state == RMR_OK ) {
490 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
491 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
495 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
496 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
501 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
508 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
514 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
515 _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
516 mechnism indicates eagain or etimeedout. All other error conditions are reported
517 without this delay. Setting a timeout of 0 causes no retries to be attempted in
518 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
519 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
520 after every 1K send attempts until the "time" value is reached. Retries are abandoned
521 if NNG returns anything other than EAGAIN or EINTER is returned.
523 The default, if this function is not used, is 1; meaning that RMr will retry, but will
524 not enter a sleep. In all cases the caller should check the status in the message returned
527 Returns -1 if the context was invalid; RMR_OK otherwise.
529 extern int rmr_set_stimeout( void* vctx, int time ) {
532 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
540 ctx->send_retries = time;
545 Set receive timeout -- not supported in nng implementation
547 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
549 extern int rmr_set_rtimeout( void* vctx, int time ) {
550 rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
556 This is the actual init workhorse. The user visible function meerly ensures that the
557 calling programme does NOT set any internal flags that are supported, and then
558 invokes this. Internal functions (the route table collector) which need additional
559 open ports without starting additional route table collectors, will invoke this
560 directly with the proper flag.
562 CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
563 that we know about. The _user_ should ensure that the supplied length also
564 includes the trace data length maximum as they are in control of that.
566 static void* init( char* uproto_port, int def_msg_size, int flags ) {
567 static int announced = 0;
568 uta_ctx_t* ctx = NULL;
569 char bind_info[256]; // bind info
570 char* proto = "tcp"; // pointer into the proto/port string user supplied
571 char* port; // pointer into the proto_port buffer at the port value
572 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
574 char wbuf[1024]; // work buffer
575 char* tok; // pointer at token in a buffer
577 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
582 old_vlevel = rmr_vlog_init(); // initialise and get the current level
585 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
586 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
587 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
590 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
591 uta_dump_env(); // spit out environment settings meaningful to us if in info mode
595 if( uproto_port == NULL ) {
596 proto_port = strdup( DEF_COMM_PORT );
598 proto_port = strdup( uproto_port ); // so we can modify it
601 if ( proto_port == NULL ){
606 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
610 memset( ctx, 0, sizeof( uta_ctx_t ) );
612 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
613 ctx->nrivers = 256; // number of input flows we'll manage
614 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
615 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
616 for( i = 0; i < ctx->nrivers; i++ ) {
617 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
620 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
621 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
622 ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt
623 ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
625 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
626 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
628 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
629 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
630 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
631 uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast
633 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
635 init_mtcall( ctx ); // set up call chutes
636 fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
639 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
640 if( def_msg_size > 0 ) {
641 ctx->max_plen = def_msg_size;
644 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
645 if( ctx->si_ctx == NULL ) {
646 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
650 if( (port = strchr( proto_port, ':' )) != NULL ) {
651 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
654 *(port++) = 0; // term proto string and point at port string
655 proto = proto_port; // user supplied proto so point at it rather than default
658 port = proto_port; // assume something like "1234" was passed
661 if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
665 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
666 tok = strdup( tok ); // something we can destroy
667 if( *tok == '[' ) { // we allow an ipv6 address here
668 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
670 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
672 if( tok2 && *tok2 ) { // if it's not the end of string marker
673 *tok2 = 0; // make it so
676 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
679 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
680 rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
683 if( (tok = strchr( wbuf, '.' )) != NULL ) {
684 *tok = 0; // we don't keep domain portion
688 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
689 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
690 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
695 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
696 if( atoi( tok ) > 0 ) {
697 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
701 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
702 if( flags & RMRFL_NAME_ONLY ) {
703 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
705 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
706 if( ctx->my_ip == NULL ) {
707 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
708 ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
711 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
713 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
715 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
720 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
721 interface = "0.0.0.0";
724 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
725 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
726 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
730 // finish all flag setting before threads to keep helgrind quiet
731 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
734 // ---------------- setup for route table collector before invoking ----------------------------------
735 ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables
736 if( ctx->rtgate != NULL ) {
737 pthread_mutex_init( ctx->rtgate, NULL );
740 ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table
741 if( ctx->ephash == NULL ) {
742 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
747 ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used
748 if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases
749 ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case
751 ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread
754 rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
755 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
756 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
759 rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
760 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
761 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
766 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
767 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
780 Initialise the message routing environment. Flags are one of the UTAFL_
781 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
782 (tcp) to be used, then :port is all that is needed.
784 At the moment it seems that TCP really is the only viable protocol, but
785 we'll allow flexibility.
787 The return value is a void pointer which must be passed to most uta functions. On
788 error, a nil pointer is returned and errno should be set.
791 No user flags supported (needed) at the moment, but this provides for extension
792 without drastically changing anything. The user should invoke with RMRFL_NONE to
793 avoid any misbehavour as there are internal flags which are suported
795 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
796 return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
800 This sets the default trace length which will be added to any message buffers
801 allocated. It can be set at any time, and if rmr_set_trace() is given a
802 trace len that is different than the default allcoated in a message, the message
805 Returns 0 on failure and 1 on success. If failure, then errno will be set.
807 extern int rmr_init_trace( void* vctx, int tr_len ) {
811 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
816 ctx->trace_data_len = tr_len;
821 Return true if routing table is initialised etc. and app can send/receive.
823 extern int rmr_ready( void* vctx ) {
826 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
830 return ctx->rtable_ready;
834 This returns the message queue ring's filedescriptor which can be used for
835 calls to epoll. The user shouild NOT read, write, or close the fd.
837 Returns the file descriptor or -1 on error.
839 extern int rmr_get_rcvfd( void* vctx ) {
843 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
847 return uta_ring_getpfd( ctx->mring );
854 There isn't an si_flush() per se, but we can pause, generate
855 a context switch, which should allow the last sent buffer to
856 flow. There isn't exactly an nng_term/close either, so there
857 isn't much we can do.
859 extern void rmr_close( void* vctx ) {
862 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
868 SItp_stats( ctx->si_ctx ); // dump some interesting stats
870 // FIX ME -- how to we turn off si; close all sessions etc?
871 //SIclose( ctx->nn_sock );
876 // ----- multi-threaded call/receive support -------------------------------------------------
879 Blocks on the receive ring chute semaphore and then reads from the ring
880 when it is tickled. If max_wait is -1 then the function blocks until
881 a message is ready on the ring. Else max_wait is assumed to be the number
882 of millaseconds to wait before returning a timeout message.
884 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
887 struct timespec ts; // time info if we have a timeout
888 long new_ms; // adjusted mu-sec
889 long seconds = 0; // max wait seconds
890 long nano_sec; // max wait xlated to nano seconds
892 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
894 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
897 mbuf->state = RMR_ERR_BADARG;
898 mbuf->tp_state = errno;
903 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
905 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
907 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
908 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
909 clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time
910 sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop)
912 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
915 mbuf = ombuf; // return original if it was given with timeout status
916 if( ombuf != NULL ) {
917 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
923 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
930 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
934 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
936 if( max_wait > 999 ) {
937 seconds = max_wait / 1000;
938 max_wait -= seconds * 1000;
939 ts.tv_sec += seconds;
942 nano_sec = max_wait * 1000000;
943 ts.tv_nsec += nano_sec;
944 if( ts.tv_nsec > 999999999 ) {
945 ts.tv_nsec -= 999999999;
950 seconds = 1; // use as flag later to invoked timed wait
955 while( state < 0 && errno == EINTR ) {
957 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
959 state = sem_wait( &chute->barrier );
964 mbuf = ombuf; // return caller's buffer if they passed one in
966 errno = 0; // interrupted call state could be left; clear
967 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
968 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
969 mbuf->state = RMR_OK;
970 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
973 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
977 mbuf = ombuf; // no buffer, return user's if there
982 mbuf->tp_state = errno;
991 This is the work horse for the multi-threaded call() function. It supports
992 both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
993 for for rmr_mt_call() modulo the caveat below.
995 If endpoint is given, then we assume that we're not doing normal route table
996 routing and that we should send directly to that endpoint (probably worm
999 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1000 rmr_mbuf_t* ombuf; // original mbuf passed in
1002 uta_mhdr_t* hdr; // header in the transport buffer
1004 unsigned char* d1; // d1 data in header
1005 struct timespec ts; // time info if we have a timeout
1006 long new_ms; // adjusted mu-sec
1007 long seconds = 0; // max wait seconds
1008 long nano_sec; // max wait xlated to nano seconds
1012 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1014 mbuf->tp_state = errno;
1015 mbuf->state = RMR_ERR_BADARG;
1020 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1021 mbuf->state = RMR_ERR_NOTSUPP;
1022 mbuf->tp_state = errno;
1026 ombuf = mbuf; // save to return timeout status with
1028 chute = &ctx->chutes[call_id];
1029 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
1030 rmr_free_msg( chute->mbuf );
1034 hdr = (uta_mhdr_t *) mbuf->header;
1035 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
1036 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
1037 d1 = DATA1_ADDR( hdr );
1038 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
1039 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1041 if( max_wait >= 0 ) {
1042 clock_gettime( CLOCK_REALTIME, &ts );
1044 if( max_wait > 999 ) {
1045 seconds = max_wait / 1000;
1046 max_wait -= seconds * 1000;
1047 ts.tv_sec += seconds;
1049 if( max_wait > 0 ) {
1050 nano_sec = max_wait * 1000000;
1051 ts.tv_nsec += nano_sec;
1052 if( ts.tv_nsec > 999999999 ) {
1053 ts.tv_nsec -= 999999999;
1058 seconds = 1; // use as flag later to invoked timed wait
1061 if( ep == NULL ) { // normal routing
1062 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1064 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1067 if( mbuf->state != RMR_OK ) {
1068 mbuf->tp_state = errno;
1069 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1075 while( chute->mbuf == NULL && ! errno ) {
1077 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1079 state = sem_wait( &chute->barrier );
1082 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1086 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1087 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1088 rmr_free_msg( chute->mbuf );
1096 return NULL; // leave errno as set by sem wait call
1100 if( mbuf != NULL ) {
1101 mbuf->state = RMR_OK;
1109 Accept a message buffer and caller ID, send the message and then wait
1110 for the receiver to tickle the semaphore letting us know that a message
1111 has been received. The call_id is a value between 2 and 255, inclusive; if
1112 it's not in this range an error will be returned. Max wait is the amount
1113 of time in millaseconds that the call should block for. If 0 is given
1114 then no timeout is set.
1116 If the mt_call feature has not been initialised, then the attempt to use this
1117 funciton will fail with RMR_ERR_NOTSUPP
1119 If no matching message is received before the max_wait period expires, a
1120 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1121 occurs after the message has been sent, then a nil pointer is returned
1122 with errno set to some other value.
1124 This is now just an outward facing wrapper so we can support wormhole calls.
1126 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1128 // must vet call_id here, all others vetted by workhorse mt_call() function
1129 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
1130 if( mbuf != NULL ) {
1131 mbuf->state = RMR_ERR_BADARG;
1132 mbuf->tp_state = EINVAL;
1137 return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1142 Given an existing message buffer, reallocate the payload portion to
1143 be at least new_len bytes. The message header will remain such that
1144 the caller may use the rmr_rts_msg() function to return a payload
1147 The mbuf passed in may or may not be reallocated and the caller must
1148 use the returned pointer and should NOT assume that it can use the
1149 pointer passed in with the exceptions based on the clone flag.
1151 If the clone flag is set, then a duplicated message, with larger payload
1152 size, is allocated and returned. The old_msg pointer in this situation is
1153 still valid and must be explicitly freed by the application. If the clone
1154 message is not set (0), then any memory management of the old message is
1155 handled by the function.
1157 If the copy flag is set, the contents of the old message's payload is
1158 copied to the reallocated payload. If the flag is not set, then the
1159 contents of the payload is undetermined.
1161 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1162 if( old_msg == NULL ) {
1166 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1170 Enable low latency things in the transport (when supported).
1172 extern void rmr_set_low_latency( void* vctx ) {
1175 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1176 if( ctx->si_ctx != NULL ) {
1177 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1185 extern void rmr_set_fack( void* vctx ) {
1188 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1189 if( ctx->si_ctx != NULL ) {
1190 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );