1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
59 #define SI95_BUILD 1 // we drop some common functions for si
61 #include "rmr.h" // things the users see
62 #include "rmr_agnostic.h" // agnostic things (must be included before private)
63 #include "rmr_si_private.h" // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
67 #include "ring_static.c" // message ring support
68 #include "rt_generic_static.c" // route table things not transport specific
69 #include "rtable_si_static.c" // route table things -- transport specific
71 #include "rtc_static.c" // route table collector (thread code)
72 #include "tools_static.c"
73 #include "sr_si_static.c" // send/receive static functions
74 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_si_static.c"
79 //------------------------------------------------------------------------------
85 static void free_ctx( uta_ctx_t* ctx ) {
88 free( ctx->rtg_addr );
90 uta_ring_free( ctx->mring );
91 uta_ring_free( ctx->zcb_mring );
96 rmr_sym_free( ctx->fd2ep );
105 rmr_sym_free( ctx->rtable->hash );
115 // --------------- public functions --------------------------------------------------------------------------
118 Returns the size of the payload (bytes) that the msg buffer references.
119 Len in a message is the number of bytes which were received, or should
120 be transmitted, however, it is possible that the mbuf was allocated
121 with a larger payload space than the payload length indicates; this
122 function returns the absolute maximum space that the user has available
123 in the payload. On error (bad msg buffer) -1 is returned and errno should
126 The allocated len stored in the msg is:
127 transport header length +
129 user requested payload
131 The msg header is a combination of the fixed RMR header and the variable
132 trace data and d2 fields which may vary for each message.
134 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
135 if( msg == NULL || msg->header == NULL ) {
141 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
145 Allocates a send message as a zerocopy message allowing the underlying message protocol
146 to send the buffer without copy.
148 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
152 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
156 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
162 Allocates a send message as a zerocopy message allowing the underlying message protocol
163 to send the buffer without copy. In addition, a trace data field of tr_size will be
164 added and the supplied data coppied to the buffer before returning the message to
167 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
172 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
176 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
178 state = rmr_set_trace( m, data, tr_size ); // roll their data in
179 if( state != tr_size ) {
180 m->state = RMR_ERR_INITFAILED;
188 This provides an external path to the realloc static function as it's called by an
189 outward facing mbuf api function. Used to reallocate a message with a different
192 User programmes must use this with CAUTION! The mbuf passed in is NOT freed and
193 is still valid following this call. The caller is reponsible for maintainting
194 a pointer to both old and new messages and invoking rmr_free_msg() on both!
196 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
197 return realloc_msg( msg, new_tr_size );
202 Return the message to the available pool, or free it outright.
204 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
206 fprintf( stderr, ">>>FREE nil buffer\n" );
211 if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
212 ! mbuf->ring || // cant cache if no ring
213 ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
216 free( mbuf->tp_buf );
217 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
220 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
224 // always free, never manage a pool
226 free( mbuf->tp_buf );
227 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
230 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
236 This is a wrapper to the real timeout send. We must wrap it now to ensure that
237 the call flag and call-id are reset
239 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
240 char* d1; // point at the call-id in the header
243 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
245 d1 = DATA1_ADDR( msg->header );
246 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
249 return mtosend_msg( vctx, msg, max_to );
253 Send with default max timeout as is set in the context.
254 See rmr_mtosend_msg() for more details on the parameters.
255 See rmr_stimeout() for info on setting the default timeout.
257 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
258 char* d1; // point at the call-id in the header
261 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
263 d1 = DATA1_ADDR( msg->header );
264 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
267 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
271 Return to sender allows a message to be sent back to the endpoint where it originated.
273 With SI95 it was thought that the return to sender would be along the same open conneciton
274 and thus no table lookup would be needed to open a 'reverse direction' path. However, for
275 applications sending at high message rates, returning responses on the same connection
276 causes major strife. Thus the decision was made to use the same method as NNG and just
277 open a second connection for reverse path.
279 We will attempt to use the name in the received message to look up the endpoint. If
280 that failes, then we will write on the connection that the message arrived on as a
283 On success (state is RMR_OK, the caller may use the buffer for another receive operation),
284 and on error it can be passed back to this function to retry the send if desired. On error,
285 errno will liklely have the failure reason set by the nng send processing. The following
286 are possible values for the state in the message buffer:
288 Message states returned:
289 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
290 RMR_ERR_NOHDR - message did not have a header
291 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
292 RMR_ERR_SENDFAILED - send failed; errno has nano error code
293 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
295 A nil message as the return value is rare, and generally indicates some kind of horrible
296 failure. The value of errno might give a clue as to what is wrong.
299 Like send_msg(), this is non-blocking and will return the msg if there is an error.
300 The caller must check for this and handle it properly.
302 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
303 int nn_sock; // endpoint socket for send
305 char* hold_src; // we need the original source if send fails
306 char* hold_ip; // also must hold original ip
307 int sock_ok = 0; // true if we found a valid endpoint socket
308 endpoint_t* ep = NULL; // end point to track counts
310 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
311 errno = EINVAL; // if msg is null, this is their clue
313 msg->state = RMR_ERR_BADARG;
314 msg->tp_state = errno;
319 errno = 0; // at this point any bad state is in msg returned
320 if( msg->header == NULL ) {
321 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
322 msg->state = RMR_ERR_NOHDR;
323 msg->tp_state = errno;
327 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
329 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
331 if( (nn_sock = msg->rts_fd) < 0 ) {
332 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
333 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
336 msg->state = RMR_ERR_NOENDPT;
343 msg->state = RMR_OK; // ensure it is clear before send
344 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
345 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
346 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
347 msg = send_msg( ctx, msg, nn_sock, -1 );
350 switch( msg->state ) {
352 ep->scounts[EPSC_GOOD]++;
356 ep->scounts[EPSC_TRANS]++;
360 // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
361 ep->scounts[EPSC_FAIL]++;
365 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again
366 zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
367 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
376 If multi-threading call is turned on, this invokes that mechanism with the special call
377 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
378 behavour (described below) is carried out. This is safe to use when mt is enabled, but
379 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
382 On timeout this function will return a nil pointer. If the original message could not
383 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
386 Call sends the message based on message routing using the message type, and waits for a
387 response message to arrive with the same transaction id that was in the outgoing message.
388 If, while wiating for the expected response, messages are received which do not have the
389 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
390 order that they were received.
392 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
393 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
394 may be resent (likely the context pointer was nil). If the message is sent, but no
395 response is received, a nil message is returned with errno set to indicate the likley
397 ETIMEDOUT -- too many messages were queued before reciving the expected response
398 ENOBUFS -- the queued message ring is full, messages were dropped
399 EINVAL -- A parameter was not valid
400 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
401 user should call this function with the message again.
404 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
407 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
409 msg->state = RMR_ERR_BADARG;
414 return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec
418 The outward facing receive function. When invoked it will pop the oldest message
419 from the receive ring, if any are queued, and return it. If the ring is empty
420 then the receive function is invoked to wait for the next message to arrive (blocking).
422 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
423 nil, a new one will be allocated. However, the caller should NOT expect to get the same
424 struct back (if a queued message is returned the message struct will be different).
426 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
428 rmr_mbuf_t* qm; // message that was queued on the ring
430 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
432 if( old_msg != NULL ) {
433 old_msg->state = RMR_ERR_BADARG;
434 old_msg->tp_state = errno;
440 return rmr_mt_rcv( ctx, old_msg, -1 );
444 This allows a timeout based receive for applications unable to implement epoll_wait()
447 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
450 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
452 if( old_msg != NULL ) {
453 old_msg->state = RMR_ERR_BADARG;
454 old_msg->tp_state = errno;
459 return rmr_mt_rcv( ctx, old_msg, ms_to );
463 DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
464 too. This function likely will not behave as expected in SI, and we are pretty sure it
465 isn't being used as there was an abort triggering reference to rmr_rcv() until now.
467 This blocks until the message with the 'expect' ID is received. Messages which are received
468 before the expected message are queued onto the message ring. The function will return
469 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
470 expected message is received. If the queued message ring fills a nil pointer is returned
471 and errno is set to ENOBUFS.
473 Generally this will be invoked only by the call() function as it waits for a response, but
474 it is exposed to the user application as three is no reason not to.
476 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
478 int queued = 0; // number we pushed into the ring
479 int exp_len = 0; // length of expected ID
481 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
484 msg->state = RMR_ERR_BADARG;
485 msg->tp_state = errno;
492 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
493 return rmr_rcv_msg( ctx, msg );
496 exp_len = strlen( expect );
497 if( exp_len > RMR_MAX_XID ) {
498 exp_len = RMR_MAX_XID;
500 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
502 while( queued < allow2queue ) {
503 msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
505 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
506 if( msg->state == RMR_OK ) {
507 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
508 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
512 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
513 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
518 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
525 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
531 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
532 _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
533 mechnism indicates eagain or etimeedout. All other error conditions are reported
534 without this delay. Setting a timeout of 0 causes no retries to be attempted in
535 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
536 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
537 after every 1K send attempts until the "time" value is reached. Retries are abandoned
538 if NNG returns anything other than EAGAIN or EINTER is returned.
540 The default, if this function is not used, is 1; meaning that RMr will retry, but will
541 not enter a sleep. In all cases the caller should check the status in the message returned
544 Returns -1 if the context was invalid; RMR_OK otherwise.
546 extern int rmr_set_stimeout( void* vctx, int time ) {
549 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
557 ctx->send_retries = time;
562 Set receive timeout -- not supported in nng implementation
564 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
566 extern int rmr_set_rtimeout( void* vctx, int time ) {
567 rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
573 This is the actual init workhorse. The user visible function meerly ensures that the
574 calling programme does NOT set any internal flags that are supported, and then
575 invokes this. Internal functions (the route table collector) which need additional
576 open ports without starting additional route table collectors, will invoke this
577 directly with the proper flag.
579 CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
580 that we know about. The _user_ should ensure that the supplied length also
581 includes the trace data length maximum as they are in control of that.
583 static void* init( char* uproto_port, int def_msg_size, int flags ) {
584 static int announced = 0;
585 uta_ctx_t* ctx = NULL;
586 char bind_info[256]; // bind info
587 char* proto = "tcp"; // pointer into the proto/port string user supplied
588 char* port; // pointer into the proto_port buffer at the port value
589 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
591 char wbuf[1024]; // work buffer
592 char* tok; // pointer at token in a buffer
594 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
599 old_vlevel = rmr_vlog_init(); // initialise and get the current level
602 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
603 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
604 uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
607 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
608 uta_dump_env(); // spit out environment settings meaningful to us if in info mode
612 if( uproto_port == NULL ) {
613 proto_port = strdup( DEF_COMM_PORT );
614 rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
616 proto_port = strdup( uproto_port ); // so we can modify it
619 if ( proto_port == NULL ){
624 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
628 memset( ctx, 0, sizeof( uta_ctx_t ) );
630 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
631 ctx->snarf_rt_fd = -1;
632 ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max
633 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
634 ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed
635 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
636 for( i = 0; i < ctx->nrivers; i++ ) {
637 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
640 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
641 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
642 ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt
643 ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
645 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
646 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
648 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
649 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
650 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
651 uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast
653 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
655 init_mtcall( ctx ); // set up call chutes
656 fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
659 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
660 if( def_msg_size > 0 ) {
661 ctx->max_plen = def_msg_size;
664 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
665 if( ctx->si_ctx == NULL ) {
666 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
670 if( (port = strchr( proto_port, ':' )) != NULL ) {
671 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
674 *(port++) = 0; // term proto string and point at port string
675 proto = proto_port; // user supplied proto so point at it rather than default
678 port = proto_port; // assume something like "1234" was passed
680 rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
682 if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
686 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
687 tok = strdup( tok ); // something we can destroy
688 if( *tok == '[' ) { // we allow an ipv6 address here
689 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
691 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
693 if( tok2 && *tok2 ) { // if it's not the end of string marker
694 *tok2 = 0; // make it so
697 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
700 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
701 rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
704 if( (tok = strchr( wbuf, '.' )) != NULL ) {
705 *tok = 0; // we don't keep domain portion
709 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
710 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
711 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
716 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
717 if( atoi( tok ) > 0 ) {
718 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
722 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
723 if( flags & RMRFL_NAME_ONLY ) {
724 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
726 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
727 if( ctx->my_ip == NULL ) {
728 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
729 ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
732 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
734 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
736 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
741 if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all
742 interface = "0.0.0.0";
745 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
746 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
747 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
751 // finish all flag setting before threads to keep helgrind quiet
752 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
755 // ---------------- setup for route table collector before invoking ----------------------------------
756 ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables
757 if( ctx->rtgate != NULL ) {
758 pthread_mutex_init( ctx->rtgate, NULL );
761 ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table
762 if( ctx->ephash == NULL ) {
763 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
768 ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used
769 if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases
770 ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case
772 ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread
775 rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
776 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
777 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
780 rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
781 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
782 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
787 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
788 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
801 Initialise the message routing environment. Flags are one of the UTAFL_
802 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
803 (tcp) to be used, then :port is all that is needed.
805 At the moment it seems that TCP really is the only viable protocol, but
806 we'll allow flexibility.
808 The return value is a void pointer which must be passed to most uta functions. On
809 error, a nil pointer is returned and errno should be set.
812 No user flags supported (needed) at the moment, but this provides for extension
813 without drastically changing anything. The user should invoke with RMRFL_NONE to
814 avoid any misbehavour as there are internal flags which are suported
816 extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
817 return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
821 This sets the default trace length which will be added to any message buffers
822 allocated. It can be set at any time, and if rmr_set_trace() is given a
823 trace len that is different than the default allcoated in a message, the message
826 Returns 0 on failure and 1 on success. If failure, then errno will be set.
828 extern int rmr_init_trace( void* vctx, int tr_len ) {
832 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
837 ctx->trace_data_len = tr_len;
842 Return true if routing table is initialised etc. and app can send/receive.
844 extern int rmr_ready( void* vctx ) {
847 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
851 return ctx->rtable_ready;
855 This returns the message queue ring's filedescriptor which can be used for
856 calls to epoll. The user shouild NOT read, write, or close the fd.
858 Returns the file descriptor or -1 on error.
860 extern int rmr_get_rcvfd( void* vctx ) {
864 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
868 return uta_ring_getpfd( ctx->mring );
875 There isn't an si_flush() per se, but we can pause, generate
876 a context switch, which should allow the last sent buffer to
877 flow. There isn't exactly an nng_term/close either, so there
878 isn't much we can do.
880 extern void rmr_close( void* vctx ) {
883 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
889 SItp_stats( ctx->si_ctx ); // dump some interesting stats
891 // FIX ME -- how to we turn off si; close all sessions etc?
892 //SIclose( ctx->nn_sock );
897 // ----- multi-threaded call/receive support -------------------------------------------------
900 Blocks on the receive ring chute semaphore and then reads from the ring
901 when it is tickled. If max_wait is -1 then the function blocks until
902 a message is ready on the ring. Else max_wait is assumed to be the number
903 of millaseconds to wait before returning a timeout message.
905 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
908 struct timespec ts; // time info if we have a timeout
909 long new_ms; // adjusted mu-sec
910 long seconds = 0; // max wait seconds
911 long nano_sec; // max wait xlated to nano seconds
913 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
915 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
918 mbuf->state = RMR_ERR_BADARG;
919 mbuf->tp_state = errno;
924 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
926 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
928 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
929 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
930 clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time
931 sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop)
933 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
936 mbuf = ombuf; // return original if it was given with timeout status
937 if( ombuf != NULL ) {
938 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
944 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
951 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
955 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
957 if( max_wait > 999 ) {
958 seconds = max_wait / 1000;
959 max_wait -= seconds * 1000;
960 ts.tv_sec += seconds;
963 nano_sec = max_wait * 1000000;
964 ts.tv_nsec += nano_sec;
965 if( ts.tv_nsec > 999999999 ) {
966 ts.tv_nsec -= 999999999;
971 seconds = 1; // use as flag later to invoked timed wait
976 while( state < 0 && errno == EINTR ) {
978 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
980 state = sem_wait( &chute->barrier );
985 mbuf = ombuf; // return caller's buffer if they passed one in
987 errno = 0; // interrupted call state could be left; clear
988 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
989 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
990 mbuf->state = RMR_OK;
991 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
994 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
998 mbuf = ombuf; // no buffer, return user's if there
1003 mbuf->tp_state = errno;
1012 This is the work horse for the multi-threaded call() function. It supports
1013 both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
1014 for for rmr_mt_call() modulo the caveat below.
1016 If endpoint is given, then we assume that we're not doing normal route table
1017 routing and that we should send directly to that endpoint (probably worm
1020 static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
1021 rmr_mbuf_t* ombuf; // original mbuf passed in
1023 uta_mhdr_t* hdr; // header in the transport buffer
1025 unsigned char* d1; // d1 data in header
1026 struct timespec ts; // time info if we have a timeout
1027 long new_ms; // adjusted mu-sec
1028 long seconds = 0; // max wait seconds
1029 long nano_sec; // max wait xlated to nano seconds
1033 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1035 mbuf->tp_state = errno;
1036 mbuf->state = RMR_ERR_BADARG;
1041 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1042 mbuf->state = RMR_ERR_NOTSUPP;
1043 mbuf->tp_state = errno;
1047 ombuf = mbuf; // save to return timeout status with
1049 chute = &ctx->chutes[call_id];
1050 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
1051 rmr_free_msg( chute->mbuf );
1055 hdr = (uta_mhdr_t *) mbuf->header;
1056 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
1057 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
1058 d1 = DATA1_ADDR( hdr );
1059 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
1060 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1062 if( max_wait >= 0 ) {
1063 clock_gettime( CLOCK_REALTIME, &ts );
1065 if( max_wait > 999 ) {
1066 seconds = max_wait / 1000;
1067 max_wait -= seconds * 1000;
1068 ts.tv_sec += seconds;
1070 if( max_wait > 0 ) {
1071 nano_sec = max_wait * 1000000;
1072 ts.tv_nsec += nano_sec;
1073 if( ts.tv_nsec > 999999999 ) {
1074 ts.tv_nsec -= 999999999;
1079 seconds = 1; // use as flag later to invoked timed wait
1082 if( ep == NULL ) { // normal routing
1083 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1085 mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
1088 if( mbuf->state != RMR_OK ) {
1089 mbuf->tp_state = errno;
1090 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1096 while( chute->mbuf == NULL && ! errno ) {
1098 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1100 state = sem_wait( &chute->barrier );
1103 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1107 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1108 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1109 rmr_free_msg( chute->mbuf );
1117 return NULL; // leave errno as set by sem wait call
1121 if( mbuf != NULL ) {
1122 mbuf->state = RMR_OK;
1130 Accept a message buffer and caller ID, send the message and then wait
1131 for the receiver to tickle the semaphore letting us know that a message
1132 has been received. The call_id is a value between 2 and 255, inclusive; if
1133 it's not in this range an error will be returned. Max wait is the amount
1134 of time in millaseconds that the call should block for. If 0 is given
1135 then no timeout is set.
1137 If the mt_call feature has not been initialised, then the attempt to use this
1138 funciton will fail with RMR_ERR_NOTSUPP
1140 If no matching message is received before the max_wait period expires, a
1141 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
1142 occurs after the message has been sent, then a nil pointer is returned
1143 with errno set to some other value.
1145 This is now just an outward facing wrapper so we can support wormhole calls.
1147 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
1149 // must vet call_id here, all others vetted by workhorse mt_call() function
1150 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
1151 if( mbuf != NULL ) {
1152 mbuf->state = RMR_ERR_BADARG;
1153 mbuf->tp_state = EINVAL;
1158 return mt_call( vctx, mbuf, call_id, max_wait, NULL );
1163 Given an existing message buffer, reallocate the payload portion to
1164 be at least new_len bytes. The message header will remain such that
1165 the caller may use the rmr_rts_msg() function to return a payload
1168 The mbuf passed in may or may not be reallocated and the caller must
1169 use the returned pointer and should NOT assume that it can use the
1170 pointer passed in with the exceptions based on the clone flag.
1172 If the clone flag is set, then a duplicated message, with larger payload
1173 size, is allocated and returned. The old_msg pointer in this situation is
1174 still valid and must be explicitly freed by the application. If the clone
1175 message is not set (0), then any memory management of the old message is
1176 handled by the function.
1178 If the copy flag is set, the contents of the old message's payload is
1179 copied to the reallocated payload. If the flag is not set, then the
1180 contents of the payload is undetermined.
1182 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1183 if( old_msg == NULL ) {
1187 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1191 Enable low latency things in the transport (when supported).
1193 extern void rmr_set_low_latency( void* vctx ) {
1196 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1197 if( ctx->si_ctx != NULL ) {
1198 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1206 extern void rmr_set_fack( void* vctx ) {
1209 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1210 if( ctx->si_ctx != NULL ) {
1211 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );