1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
59 #define SI95_BUILD 1 // we drop some common functions for si
61 #include "rmr.h" // things the users see
62 #include "rmr_agnostic.h" // agnostic things (must be included before private)
63 #include "rmr_si_private.h" // things that we need too
64 #include "rmr_symtab.h"
65 #include "rmr_logging.h"
67 #include "ring_static.c" // message ring support
68 #include "rt_generic_static.c" // route table things not transport specific
69 #include "rtable_si_static.c" // route table things -- transport specific
70 #include "rtc_static.c" // route table collector (thread code)
71 #include "tools_static.c"
72 #include "sr_si_static.c" // send/receive static functions
73 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
74 #include "mt_call_static.c"
75 #include "mt_call_si_static.c"
78 //------------------------------------------------------------------------------
84 static void free_ctx( uta_ctx_t* ctx ) {
87 free( ctx->rtg_addr );
92 // --------------- public functions --------------------------------------------------------------------------
95 Returns the size of the payload (bytes) that the msg buffer references.
96 Len in a message is the number of bytes which were received, or should
97 be transmitted, however, it is possible that the mbuf was allocated
98 with a larger payload space than the payload length indicates; this
99 function returns the absolute maximum space that the user has available
100 in the payload. On error (bad msg buffer) -1 is returned and errno should
103 The allocated len stored in the msg is:
104 transport header length +
106 user requested payload
108 The msg header is a combination of the fixed RMR header and the variable
109 trace data and d2 fields which may vary for each message.
111 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
112 if( msg == NULL || msg->header == NULL ) {
118 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
122 Allocates a send message as a zerocopy message allowing the underlying message protocol
123 to send the buffer without copy.
125 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
129 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
133 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
139 Allocates a send message as a zerocopy message allowing the underlying message protocol
140 to send the buffer without copy. In addition, a trace data field of tr_size will be
141 added and the supplied data coppied to the buffer before returning the message to
144 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
149 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
153 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
155 state = rmr_set_trace( m, data, tr_size ); // roll their data in
156 if( state != tr_size ) {
157 m->state = RMR_ERR_INITFAILED;
165 This provides an external path to the realloc static function as it's called by an
166 outward facing mbuf api function. Used to reallocate a message with a different
169 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
170 return realloc_msg( msg, new_tr_size );
175 Return the message to the available pool, or free it outright.
177 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
178 //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
185 if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
187 free( mbuf->tp_buf );
188 mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
191 mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
197 This is a wrapper to the real timeout send. We must wrap it now to ensure that
198 the call flag and call-id are reset
200 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
201 char* d1; // point at the call-id in the header
204 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
206 d1 = DATA1_ADDR( msg->header );
207 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
210 return mtosend_msg( vctx, msg, max_to );
214 Send with default max timeout as is set in the context.
215 See rmr_mtosend_msg() for more details on the parameters.
216 See rmr_stimeout() for info on setting the default timeout.
218 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
219 char* d1; // point at the call-id in the header
222 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
224 d1 = DATA1_ADDR( msg->header );
225 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
228 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
232 Return to sender allows a message to be sent back to the endpoint where it originated.
234 With SI95 it was thought that the return to sender would be along the same open conneciton
235 and thus no table lookup would be needed to open a 'reverse direction' path. However, for
236 applications sending at high message rates, returning responses on the same connection
237 causes major strife. Thus the decision was made to use the same method as NNG and just
238 open a second connection for reverse path.
240 We will attempt to use the name in the received message to look up the endpoint. If
241 that failes, then we will write on the connection that the message arrived on as a
244 On success (state is RMR_OK, the caller may use the buffer for another receive operation),
245 and on error it can be passed back to this function to retry the send if desired. On error,
246 errno will liklely have the failure reason set by the nng send processing. The following
247 are possible values for the state in the message buffer:
249 Message states returned:
250 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
251 RMR_ERR_NOHDR - message did not have a header
252 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
253 RMR_ERR_SENDFAILED - send failed; errno has nano error code
254 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
256 A nil message as the return value is rare, and generally indicates some kind of horrible
257 failure. The value of errno might give a clue as to what is wrong.
260 Like send_msg(), this is non-blocking and will return the msg if there is an error.
261 The caller must check for this and handle it properly.
263 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
264 int nn_sock; // endpoint socket for send
267 char* hold_src; // we need the original source if send fails
268 char* hold_ip; // also must hold original ip
269 int sock_ok = 0; // true if we found a valid endpoint socket
270 endpoint_t* ep = NULL; // end point to track counts
272 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
273 errno = EINVAL; // if msg is null, this is their clue
275 msg->state = RMR_ERR_BADARG;
276 msg->tp_state = errno;
281 errno = 0; // at this point any bad state is in msg returned
282 if( msg->header == NULL ) {
283 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
284 msg->state = RMR_ERR_NOHDR;
285 msg->tp_state = errno;
289 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
291 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
293 if( (nn_sock = msg->rts_fd) < 0 ) {
294 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
295 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
298 msg->state = RMR_ERR_NOENDPT;
305 msg->state = RMR_OK; // ensure it is clear before send
306 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
307 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
308 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
309 msg = send_msg( ctx, msg, nn_sock, -1 );
312 switch( msg->state ) {
314 ep->scounts[EPSC_GOOD]++;
318 ep->scounts[EPSC_TRANS]++;
322 // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
323 ep->scounts[EPSC_FAIL]++;
327 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
328 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
329 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
338 If multi-threading call is turned on, this invokes that mechanism with the special call
339 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
340 behavour (described below) is carried out. This is safe to use when mt is enabled, but
341 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
344 On timeout this function will return a nil pointer. If the original message could not
345 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
348 Call sends the message based on message routing using the message type, and waits for a
349 response message to arrive with the same transaction id that was in the outgoing message.
350 If, while wiating for the expected response, messages are received which do not have the
351 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
352 order that they were received.
354 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
355 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
356 may be resent (likely the context pointer was nil). If the message is sent, but no
357 response is received, a nil message is returned with errno set to indicate the likley
359 ETIMEDOUT -- too many messages were queued before reciving the expected response
360 ENOBUFS -- the queued message ring is full, messages were dropped
361 EINVAL -- A parameter was not valid
362 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
363 user should call this function with the message again.
366 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
369 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
371 msg->state = RMR_ERR_BADARG;
376 return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
380 The outward facing receive function. When invoked it will pop the oldest message
381 from the receive ring, if any are queued, and return it. If the ring is empty
382 then the receive function is invoked to wait for the next message to arrive (blocking).
384 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
385 nil, a new one will be allocated. However, the caller should NOT expect to get the same
386 struct back (if a queued message is returned the message struct will be different).
388 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
390 rmr_mbuf_t* qm; // message that was queued on the ring
392 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
394 if( old_msg != NULL ) {
395 old_msg->state = RMR_ERR_BADARG;
396 old_msg->tp_state = errno;
402 return rmr_mt_rcv( ctx, old_msg, -1 );
406 This allows a timeout based receive for applications unable to implement epoll_wait()
409 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
412 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
414 if( old_msg != NULL ) {
415 old_msg->state = RMR_ERR_BADARG;
416 old_msg->tp_state = errno;
421 return rmr_mt_rcv( ctx, old_msg, ms_to );
425 This blocks until the message with the 'expect' ID is received. Messages which are received
426 before the expected message are queued onto the message ring. The function will return
427 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
428 expected message is received. If the queued message ring fills a nil pointer is returned
429 and errno is set to ENOBUFS.
431 Generally this will be invoked only by the call() function as it waits for a response, but
432 it is exposed to the user application as three is no reason not to.
434 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
436 int queued = 0; // number we pushed into the ring
437 int exp_len = 0; // length of expected ID
439 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
442 msg->state = RMR_ERR_BADARG;
443 msg->tp_state = errno;
450 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
451 return rmr_rcv_msg( ctx, msg );
454 exp_len = strlen( expect );
455 if( exp_len > RMR_MAX_XID ) {
456 exp_len = RMR_MAX_XID;
458 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
460 while( queued < allow2queue ) {
461 msg = rcv_msg( ctx, msg ); // hard wait for next
462 if( msg->state == RMR_OK ) {
463 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
464 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
468 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
469 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
474 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
480 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
486 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
487 _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
488 mechnism indicates eagain or etimeedout. All other error conditions are reported
489 without this delay. Setting a timeout of 0 causes no retries to be attempted in
490 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
491 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
492 after every 1K send attempts until the "time" value is reached. Retries are abandoned
493 if NNG returns anything other than EAGAIN or EINTER is returned.
495 The default, if this function is not used, is 1; meaning that RMr will retry, but will
496 not enter a sleep. In all cases the caller should check the status in the message returned
499 Returns -1 if the context was invalid; RMR_OK otherwise.
501 extern int rmr_set_stimeout( void* vctx, int time ) {
504 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
512 ctx->send_retries = time;
517 Set receive timeout -- not supported in nng implementation
519 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
521 extern int rmr_set_rtimeout( void* vctx, int time ) {
522 rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
528 This is the actual init workhorse. The user visible function meerly ensures that the
529 calling programme does NOT set any internal flags that are supported, and then
530 invokes this. Internal functions (the route table collector) which need additional
531 open ports without starting additional route table collectors, will invoke this
532 directly with the proper flag.
534 CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
535 that we know about. The _user_ should ensure that the supplied length also
536 includes the trace data length maximum as they are in control of that.
538 static void* init( char* uproto_port, int max_msg_size, int flags ) {
539 static int announced = 0;
540 uta_ctx_t* ctx = NULL;
541 char bind_info[256]; // bind info
542 char* proto = "tcp"; // pointer into the proto/port string user supplied
544 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
546 char wbuf[1024]; // work buffer
547 char* tok; // pointer at token in a buffer
549 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
554 old_vlevel = rmr_vlog_init(); // initialise and get the current level
555 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
558 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
559 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
562 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
565 if( uproto_port == NULL ) {
566 proto_port = strdup( DEF_COMM_PORT );
568 proto_port = strdup( uproto_port ); // so we can modify it
571 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
575 memset( ctx, 0, sizeof( uta_ctx_t ) );
577 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
578 ctx->nrivers = 256; // number of input flows we'll manage
579 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
580 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
581 for( i = 0; i < ctx->nrivers; i++ ) {
582 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
585 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
586 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
587 ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
588 ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
590 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
591 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
593 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
594 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
595 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
597 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
599 init_mtcall( ctx ); // set up call chutes
600 fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
603 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
604 if( max_msg_size > 0 ) {
605 ctx->max_plen = max_msg_size;
608 // we're using a listener to get rtg updates, so we do NOT need this.
609 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
611 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
612 if( ctx->si_ctx == NULL ) {
613 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
618 if( (port = strchr( proto_port, ':' )) != NULL ) {
619 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
622 *(port++) = 0; // term proto string and point at port string
623 proto = proto_port; // user supplied proto so point at it rather than default
626 port = proto_port; // assume something like "1234" was passed
629 if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
630 if( atoi( tok ) < 1 ) {
635 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
636 tok = strdup( tok ); // something we can destroy
637 if( *tok == '[' ) { // we allow an ipv6 address here
638 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
640 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
642 if( tok2 && *tok2 ) { // if it's not the end of string marker
643 *tok2 = 0; // make it so
646 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
649 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
650 rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
653 if( (tok = strchr( wbuf, '.' )) != NULL ) {
654 *tok = 0; // we don't keep domain portion
658 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
659 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
660 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
664 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
665 if( atoi( tok ) > 0 ) {
666 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
670 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
671 if( flags & RMRFL_NAME_ONLY ) {
672 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
674 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
675 if( ctx->my_ip == NULL ) {
676 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
677 strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
680 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
682 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
684 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
689 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
690 interface = "0.0.0.0";
693 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
694 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
695 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
700 // finish all flag setting before threads to keep helgrind quiet
701 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
703 if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself)
704 ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
707 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
708 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
711 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
712 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
717 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
718 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
726 Initialise the message routing environment. Flags are one of the UTAFL_
727 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
728 (tcp) to be used, then :port is all that is needed.
730 At the moment it seems that TCP really is the only viable protocol, but
731 we'll allow flexibility.
733 The return value is a void pointer which must be passed to most uta functions. On
734 error, a nil pointer is returned and errno should be set.
737 No user flags supported (needed) at the moment, but this provides for extension
738 without drastically changing anything. The user should invoke with RMRFL_NONE to
739 avoid any misbehavour as there are internal flags which are suported
741 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
742 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
746 This sets the default trace length which will be added to any message buffers
747 allocated. It can be set at any time, and if rmr_set_trace() is given a
748 trace len that is different than the default allcoated in a message, the message
751 Returns 0 on failure and 1 on success. If failure, then errno will be set.
753 extern int rmr_init_trace( void* vctx, int tr_len ) {
757 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
762 ctx->trace_data_len = tr_len;
767 Return true if routing table is initialised etc. and app can send/receive.
769 extern int rmr_ready( void* vctx ) {
772 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
776 if( ctx->rtable != NULL ) {
784 This returns the message queue ring's filedescriptor which can be used for
785 calls to epoll. The user shouild NOT read, write, or close the fd.
787 Returns the file descriptor or -1 on error.
789 extern int rmr_get_rcvfd( void* vctx ) {
793 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
798 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
799 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
804 return uta_ring_getpfd( ctx->mring );
811 There isn't an si_flush() per se, but we can pause, generate
812 a context switch, which should allow the last sent buffer to
813 flow. There isn't exactly an nng_term/close either, so there
814 isn't much we can do.
816 extern void rmr_close( void* vctx ) {
819 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
825 SItp_stats( ctx->si_ctx ); // dump some interesting stats
827 // FIX ME -- how to we turn off si; close all sessions etc?
828 //SIclose( ctx->nn_sock );
833 // ----- multi-threaded call/receive support -------------------------------------------------
836 Blocks on the receive ring chute semaphore and then reads from the ring
837 when it is tickled. If max_wait is -1 then the function blocks until
838 a message is ready on the ring. Else max_wait is assumed to be the number
839 of millaseconds to wait before returning a timeout message.
841 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
843 uta_mhdr_t* hdr; // header in the transport buffer
845 struct timespec ts; // time info if we have a timeout
846 long new_ms; // adjusted mu-sec
847 long seconds = 0; // max wait seconds
848 long nano_sec; // max wait xlated to nano seconds
850 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
852 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
855 mbuf->state = RMR_ERR_BADARG;
856 mbuf->tp_state = errno;
861 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
863 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
865 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
866 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
868 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
871 mbuf = ombuf; // return original if it was given with timeout status
872 if( ombuf != NULL ) {
873 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
878 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
883 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
887 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
889 if( max_wait > 999 ) {
890 seconds = max_wait / 1000;
891 max_wait -= seconds * 1000;
892 ts.tv_sec += seconds;
895 nano_sec = max_wait * 1000000;
896 ts.tv_nsec += nano_sec;
897 if( ts.tv_nsec > 999999999 ) {
898 ts.tv_nsec -= 999999999;
903 seconds = 1; // use as flag later to invoked timed wait
908 while( state < 0 && errno == EINTR ) {
910 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
912 state = sem_wait( &chute->barrier );
917 mbuf = ombuf; // return caller's buffer if they passed one in
919 errno = 0; // interrupted call state could be left; clear
920 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
921 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
922 mbuf->state = RMR_OK;
923 mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
926 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
930 mbuf = ombuf; // no buffer, return user's if there
935 mbuf->tp_state = errno;
941 Accept a message buffer and caller ID, send the message and then wait
942 for the receiver to tickle the semaphore letting us know that a message
943 has been received. The call_id is a value between 2 and 255, inclusive; if
944 it's not in this range an error will be returned. Max wait is the amount
945 of time in millaseconds that the call should block for. If 0 is given
946 then no timeout is set.
948 If the mt_call feature has not been initialised, then the attempt to use this
949 funciton will fail with RMR_ERR_NOTSUPP
951 If no matching message is received before the max_wait period expires, a
952 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
953 occurs after the message has been sent, then a nil pointer is returned
954 with errno set to some other value.
956 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
957 rmr_mbuf_t* ombuf; // original mbuf passed in
959 uta_mhdr_t* hdr; // header in the transport buffer
961 unsigned char* d1; // d1 data in header
962 struct timespec ts; // time info if we have a timeout
963 long new_ms; // adjusted mu-sec
964 long seconds = 0; // max wait seconds
965 long nano_sec; // max wait xlated to nano seconds
969 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
971 mbuf->tp_state = errno;
972 mbuf->state = RMR_ERR_BADARG;
977 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
978 mbuf->state = RMR_ERR_NOTSUPP;
979 mbuf->tp_state = errno;
983 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
984 mbuf->state = RMR_ERR_BADARG;
985 mbuf->tp_state = errno;
989 ombuf = mbuf; // save to return timeout status with
991 chute = &ctx->chutes[call_id];
992 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
993 rmr_free_msg( chute->mbuf );
997 hdr = (uta_mhdr_t *) mbuf->header;
998 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
999 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
1000 d1 = DATA1_ADDR( hdr );
1001 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
1002 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1004 if( max_wait >= 0 ) {
1005 clock_gettime( CLOCK_REALTIME, &ts );
1007 if( max_wait > 999 ) {
1008 seconds = max_wait / 1000;
1009 max_wait -= seconds * 1000;
1010 ts.tv_sec += seconds;
1012 if( max_wait > 0 ) {
1013 nano_sec = max_wait * 1000000;
1014 ts.tv_nsec += nano_sec;
1015 if( ts.tv_nsec > 999999999 ) {
1016 ts.tv_nsec -= 999999999;
1021 seconds = 1; // use as flag later to invoked timed wait
1024 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1026 if( mbuf->state != RMR_OK ) {
1027 mbuf->tp_state = errno;
1028 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1034 while( chute->mbuf == NULL && ! errno ) {
1036 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1038 state = sem_wait( &chute->barrier );
1041 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1045 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1046 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1047 rmr_free_msg( chute->mbuf );
1055 return NULL; // leave errno as set by sem wait call
1059 mbuf->state = RMR_OK;
1066 Given an existing message buffer, reallocate the payload portion to
1067 be at least new_len bytes. The message header will remain such that
1068 the caller may use the rmr_rts_msg() function to return a payload
1071 The mbuf passed in may or may not be reallocated and the caller must
1072 use the returned pointer and should NOT assume that it can use the
1073 pointer passed in with the exceptions based on the clone flag.
1075 If the clone flag is set, then a duplicated message, with larger payload
1076 size, is allocated and returned. The old_msg pointer in this situation is
1077 still valid and must be explicitly freed by the application. If the clone
1078 message is not set (0), then any memory management of the old message is
1079 handled by the function.
1081 If the copy flag is set, the contents of the old message's payload is
1082 copied to the reallocated payload. If the flag is not set, then the
1083 contents of the payload is undetermined.
1085 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1086 if( old_msg == NULL ) {
1090 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1094 Enable low latency things in the transport (when supported).
1096 extern void rmr_set_low_latency( void* vctx ) {
1099 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1100 if( ctx->si_ctx != NULL ) {
1101 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1109 extern void rmr_set_fack( void* vctx ) {
1112 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1113 if( ctx->si_ctx != NULL ) {
1114 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );