1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
60 #include "rmr.h" // things the users see
61 #include "rmr_agnostic.h" // agnostic things (must be included before private)
62 #include "rmr_si_private.h" // things that we need too
63 #include "rmr_symtab.h"
64 #include "rmr_logging.h"
66 #include "ring_static.c" // message ring support
67 #include "rt_generic_static.c" // route table things not transport specific
68 #include "rtable_si_static.c" // route table things -- transport specific
69 #include "rtc_si_static.c" // specific RMR only route table collector (SI only for now)
70 #include "tools_static.c"
71 #include "sr_si_static.c" // send/receive static functions
72 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
73 #include "mt_call_static.c"
74 #include "mt_call_si_static.c"
77 //------------------------------------------------------------------------------
83 static void free_ctx( uta_ctx_t* ctx ) {
86 free( ctx->rtg_addr );
91 // --------------- public functions --------------------------------------------------------------------------
94 Returns the size of the payload (bytes) that the msg buffer references.
95 Len in a message is the number of bytes which were received, or should
96 be transmitted, however, it is possible that the mbuf was allocated
97 with a larger payload space than the payload length indicates; this
98 function returns the absolute maximum space that the user has available
99 in the payload. On error (bad msg buffer) -1 is returned and errno should
102 The allocated len stored in the msg is:
103 transport header length +
105 user requested payload
107 The msg header is a combination of the fixed RMR header and the variable
108 trace data and d2 fields which may vary for each message.
110 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
111 if( msg == NULL || msg->header == NULL ) {
117 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
121 Allocates a send message as a zerocopy message allowing the underlying message protocol
122 to send the buffer without copy.
124 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
128 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
132 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
138 Allocates a send message as a zerocopy message allowing the underlying message protocol
139 to send the buffer without copy. In addition, a trace data field of tr_size will be
140 added and the supplied data coppied to the buffer before returning the message to
143 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
148 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
152 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
154 state = rmr_set_trace( m, data, tr_size ); // roll their data in
155 if( state != tr_size ) {
156 m->state = RMR_ERR_INITFAILED;
164 This provides an external path to the realloc static function as it's called by an
165 outward facing mbuf api function. Used to reallocate a message with a different
168 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
169 return realloc_msg( msg, new_tr_size );
174 Return the message to the available pool, or free it outright.
176 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
177 //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
184 if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
186 free( mbuf->tp_buf );
193 This is a wrapper to the real timeout send. We must wrap it now to ensure that
194 the call flag and call-id are reset
196 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
197 char* d1; // point at the call-id in the header
200 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
202 d1 = DATA1_ADDR( msg->header );
203 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
206 return mtosend_msg( vctx, msg, max_to );
210 Send with default max timeout as is set in the context.
211 See rmr_mtosend_msg() for more details on the parameters.
212 See rmr_stimeout() for info on setting the default timeout.
214 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
215 char* d1; // point at the call-id in the header
218 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
220 d1 = DATA1_ADDR( msg->header );
221 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
224 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
228 Return to sender allows a message to be sent back to the endpoint where it originated.
230 In the SI world the file descriptor that was the source of the message is captured in
231 the mbuffer and thus can be used to quickly find the target for an RTS call.
233 The source information in the message is used to select the socket on which to write
234 the message rather than using the message type and round-robin selection. This
235 should return a message buffer with the state of the send operation set. On success
236 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
237 error it can be passed back to this function to retry the send if desired. On error,
238 errno will liklely have the failure reason set by the nng send processing.
239 The following are possible values for the state in the message buffer:
241 Message states returned:
242 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
243 RMR_ERR_NOHDR - message did not have a header
244 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
245 RMR_ERR_SENDFAILED - send failed; errno has nano error code
246 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
248 A nil message as the return value is rare, and generally indicates some kind of horrible
249 failure. The value of errno might give a clue as to what is wrong.
252 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
253 The caller must check for this and handle it properly.
255 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
256 int nn_sock; // endpoint socket for send
259 char* hold_src; // we need the original source if send fails
260 char* hold_ip; // also must hold original ip
261 int sock_ok = 0; // true if we found a valid endpoint socket
262 endpoint_t* ep = NULL; // end point to track counts
264 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
265 errno = EINVAL; // if msg is null, this is their clue
267 msg->state = RMR_ERR_BADARG;
268 msg->tp_state = errno;
273 errno = 0; // at this point any bad state is in msg returned
274 if( msg->header == NULL ) {
275 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
276 msg->state = RMR_ERR_NOHDR;
277 msg->tp_state = errno;
281 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
284 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx ); // src is always used first for rts
287 if( (nn_sock = msg->rts_fd) < 0 ) {
288 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
289 //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
290 sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
293 msg->state = RMR_ERR_NOENDPT;
294 return msg; // preallocated msg can be reused since not given back to nn
298 msg->state = RMR_OK; // ensure it is clear before send
299 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
300 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
301 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
302 msg = send_msg( ctx, msg, nn_sock, -1 );
305 switch( msg->state ) {
307 ep->scounts[EPSC_GOOD]++;
311 ep->scounts[EPSC_TRANS]++;
315 // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
316 ep->scounts[EPSC_FAIL]++;
320 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
321 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
322 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
331 If multi-threading call is turned on, this invokes that mechanism with the special call
332 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
333 behavour (described below) is carried out. This is safe to use when mt is enabled, but
334 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
337 On timeout this function will return a nil pointer. If the original message could not
338 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
341 Call sends the message based on message routing using the message type, and waits for a
342 response message to arrive with the same transaction id that was in the outgoing message.
343 If, while wiating for the expected response, messages are received which do not have the
344 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
345 order that they were received.
347 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
348 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
349 may be resent (likely the context pointer was nil). If the message is sent, but no
350 response is received, a nil message is returned with errno set to indicate the likley
352 ETIMEDOUT -- too many messages were queued before reciving the expected response
353 ENOBUFS -- the queued message ring is full, messages were dropped
354 EINVAL -- A parameter was not valid
355 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
356 user should call this function with the message again.
359 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
362 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
364 msg->state = RMR_ERR_BADARG;
369 return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
373 The outward facing receive function. When invoked it will pop the oldest message
374 from the receive ring, if any are queued, and return it. If the ring is empty
375 then the receive function is invoked to wait for the next message to arrive (blocking).
377 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
378 nil, a new one will be allocated. However, the caller should NOT expect to get the same
379 struct back (if a queued message is returned the message struct will be different).
381 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
383 rmr_mbuf_t* qm; // message that was queued on the ring
385 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
387 if( old_msg != NULL ) {
388 old_msg->state = RMR_ERR_BADARG;
389 old_msg->tp_state = errno;
395 return rmr_mt_rcv( ctx, old_msg, -1 );
399 This allows a timeout based receive for applications unable to implement epoll_wait()
402 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
405 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
407 if( old_msg != NULL ) {
408 old_msg->state = RMR_ERR_BADARG;
409 old_msg->tp_state = errno;
414 return rmr_mt_rcv( ctx, old_msg, ms_to );
418 This blocks until the message with the 'expect' ID is received. Messages which are received
419 before the expected message are queued onto the message ring. The function will return
420 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
421 expected message is received. If the queued message ring fills a nil pointer is returned
422 and errno is set to ENOBUFS.
424 Generally this will be invoked only by the call() function as it waits for a response, but
425 it is exposed to the user application as three is no reason not to.
427 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
429 int queued = 0; // number we pushed into the ring
430 int exp_len = 0; // length of expected ID
432 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
435 msg->state = RMR_ERR_BADARG;
436 msg->tp_state = errno;
443 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
444 return rmr_rcv_msg( ctx, msg );
447 exp_len = strlen( expect );
448 if( exp_len > RMR_MAX_XID ) {
449 exp_len = RMR_MAX_XID;
451 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
453 while( queued < allow2queue ) {
454 msg = rcv_msg( ctx, msg ); // hard wait for next
455 if( msg->state == RMR_OK ) {
456 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
457 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
461 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
462 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
467 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
473 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
479 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
480 _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
481 mechnism indicates eagain or etimeedout. All other error conditions are reported
482 without this delay. Setting a timeout of 0 causes no retries to be attempted in
483 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
484 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
485 after every 1K send attempts until the "time" value is reached. Retries are abandoned
486 if NNG returns anything other than EAGAIN or EINTER is returned.
488 The default, if this function is not used, is 1; meaning that RMr will retry, but will
489 not enter a sleep. In all cases the caller should check the status in the message returned
492 Returns -1 if the context was invalid; RMR_OK otherwise.
494 extern int rmr_set_stimeout( void* vctx, int time ) {
497 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
505 ctx->send_retries = time;
510 Set receive timeout -- not supported in nng implementation
512 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
514 extern int rmr_set_rtimeout( void* vctx, int time ) {
515 rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
521 This is the actual init workhorse. The user visible function meerly ensures that the
522 calling programme does NOT set any internal flags that are supported, and then
523 invokes this. Internal functions (the route table collector) which need additional
524 open ports without starting additional route table collectors, will invoke this
525 directly with the proper flag.
527 CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
528 that we know about. The _user_ should ensure that the supplied length also
529 includes the trace data length maximum as they are in control of that.
531 static void* init( char* uproto_port, int max_msg_size, int flags ) {
532 static int announced = 0;
533 uta_ctx_t* ctx = NULL;
534 char bind_info[256]; // bind info
535 char* proto = "tcp"; // pointer into the proto/port string user supplied
537 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
539 char wbuf[1024]; // work buffer
540 char* tok; // pointer at token in a buffer
542 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
547 old_vlevel = rmr_vlog_init(); // initialise and get the current level
548 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
551 rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/c mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
552 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
555 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
558 if( uproto_port == NULL ) {
559 proto_port = strdup( DEF_COMM_PORT );
561 proto_port = strdup( uproto_port ); // so we can modify it
564 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
568 memset( ctx, 0, sizeof( uta_ctx_t ) );
570 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
571 ctx->nrivers = 256; // number of input flows we'll manage
572 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
573 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
574 for( i = 0; i < ctx->nrivers; i++ ) {
575 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
578 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
579 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
580 ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
581 ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64; // add in our header size and a bit of fudge
583 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
584 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
586 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
587 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
588 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
590 rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
592 init_mtcall( ctx ); // set up call chutes
593 fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
596 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
597 if( max_msg_size > 0 ) {
598 ctx->max_plen = max_msg_size;
601 // we're using a listener to get rtg updates, so we do NOT need this.
602 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
604 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
605 if( ctx->si_ctx == NULL ) {
606 rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
611 if( (port = strchr( proto_port, ':' )) != NULL ) {
612 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
615 *(port++) = 0; // term proto string and point at port string
616 proto = proto_port; // user supplied proto so point at it rather than default
619 port = proto_port; // assume something like "1234" was passed
622 if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
623 if( atoi( tok ) < 1 ) {
628 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
629 tok = strdup( tok ); // something we can destroy
630 if( *tok == '[' ) { // we allow an ipv6 address here
631 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
633 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
635 if( tok2 && *tok2 ) { // if it's not the end of string marker
636 *tok2 = 0; // make it so
639 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
642 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
643 rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
646 if( (tok = strchr( wbuf, '.' )) != NULL ) {
647 *tok = 0; // we don't keep domain portion
651 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
652 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
653 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
657 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
658 if( atoi( tok ) > 0 ) {
659 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
663 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
664 if( flags & RMRFL_NAME_ONLY ) {
665 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
667 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
668 if( ctx->my_ip == NULL ) {
669 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
670 strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
673 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
675 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
677 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
682 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
683 interface = "0.0.0.0";
686 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
687 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
688 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
693 if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC
695 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
696 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
699 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
700 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
705 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
706 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
707 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
715 Initialise the message routing environment. Flags are one of the UTAFL_
716 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
717 (tcp) to be used, then :port is all that is needed.
719 At the moment it seems that TCP really is the only viable protocol, but
720 we'll allow flexibility.
722 The return value is a void pointer which must be passed to most uta functions. On
723 error, a nil pointer is returned and errno should be set.
726 No user flags supported (needed) at the moment, but this provides for extension
727 without drastically changing anything. The user should invoke with RMRFL_NONE to
728 avoid any misbehavour as there are internal flags which are suported
730 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
731 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
735 This sets the default trace length which will be added to any message buffers
736 allocated. It can be set at any time, and if rmr_set_trace() is given a
737 trace len that is different than the default allcoated in a message, the message
740 Returns 0 on failure and 1 on success. If failure, then errno will be set.
742 extern int rmr_init_trace( void* vctx, int tr_len ) {
746 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
751 ctx->trace_data_len = tr_len;
756 Return true if routing table is initialised etc. and app can send/receive.
758 extern int rmr_ready( void* vctx ) {
761 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
765 if( ctx->rtable != NULL ) {
773 This returns the message queue ring's filedescriptor which can be used for
774 calls to epoll. The user shouild NOT read, write, or close the fd.
776 Returns the file descriptor or -1 on error.
778 extern int rmr_get_rcvfd( void* vctx ) {
782 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
787 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
788 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
793 return uta_ring_getpfd( ctx->mring );
800 There isn't an si_flush() per se, but we can pause, generate
801 a context switch, which should allow the last sent buffer to
802 flow. There isn't exactly an nng_term/close either, so there
803 isn't much we can do.
805 extern void rmr_close( void* vctx ) {
808 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
814 SItp_stats( ctx->si_ctx ); // dump some interesting stats
816 // FIX ME -- how to we turn off si; close all sessions etc?
817 //SIclose( ctx->nn_sock );
822 // ----- multi-threaded call/receive support -------------------------------------------------
825 Blocks on the receive ring chute semaphore and then reads from the ring
826 when it is tickled. If max_wait is -1 then the function blocks until
827 a message is ready on the ring. Else max_wait is assumed to be the number
828 of millaseconds to wait before returning a timeout message.
830 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
832 uta_mhdr_t* hdr; // header in the transport buffer
834 struct timespec ts; // time info if we have a timeout
835 long new_ms; // adjusted mu-sec
836 long seconds = 0; // max wait seconds
837 long nano_sec; // max wait xlated to nano seconds
839 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
841 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
844 mbuf->state = RMR_ERR_BADARG;
845 mbuf->tp_state = errno;
850 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
852 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
854 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
855 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
857 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
860 mbuf = ombuf; // return original if it was given with timeout status
861 if( ombuf != NULL ) {
862 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
871 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
875 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
877 if( max_wait > 999 ) {
878 seconds = max_wait / 1000;
879 max_wait -= seconds * 1000;
880 ts.tv_sec += seconds;
883 nano_sec = max_wait * 1000000;
884 ts.tv_nsec += nano_sec;
885 if( ts.tv_nsec > 999999999 ) {
886 ts.tv_nsec -= 999999999;
891 seconds = 1; // use as flag later to invoked timed wait
896 while( state < 0 && errno == EINTR ) {
898 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
900 state = sem_wait( &chute->barrier );
905 mbuf = ombuf; // return caller's buffer if they passed one in
907 errno = 0; // interrupted call state could be left; clear
908 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
909 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
910 mbuf->state = RMR_OK;
913 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
917 mbuf = ombuf; // no buffer, return user's if there
922 mbuf->tp_state = errno;
928 Accept a message buffer and caller ID, send the message and then wait
929 for the receiver to tickle the semaphore letting us know that a message
930 has been received. The call_id is a value between 2 and 255, inclusive; if
931 it's not in this range an error will be returned. Max wait is the amount
932 of time in millaseconds that the call should block for. If 0 is given
933 then no timeout is set.
935 If the mt_call feature has not been initialised, then the attempt to use this
936 funciton will fail with RMR_ERR_NOTSUPP
938 If no matching message is received before the max_wait period expires, a
939 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
940 occurs after the message has been sent, then a nil pointer is returned
941 with errno set to some other value.
943 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
944 rmr_mbuf_t* ombuf; // original mbuf passed in
946 uta_mhdr_t* hdr; // header in the transport buffer
948 unsigned char* d1; // d1 data in header
949 struct timespec ts; // time info if we have a timeout
950 long new_ms; // adjusted mu-sec
951 long seconds = 0; // max wait seconds
952 long nano_sec; // max wait xlated to nano seconds
956 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
958 mbuf->tp_state = errno;
959 mbuf->state = RMR_ERR_BADARG;
964 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
965 mbuf->state = RMR_ERR_NOTSUPP;
966 mbuf->tp_state = errno;
970 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
971 mbuf->state = RMR_ERR_BADARG;
972 mbuf->tp_state = errno;
976 ombuf = mbuf; // save to return timeout status with
978 chute = &ctx->chutes[call_id];
979 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
980 rmr_free_msg( chute->mbuf );
984 hdr = (uta_mhdr_t *) mbuf->header;
985 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
986 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
987 d1 = DATA1_ADDR( hdr );
988 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
989 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
991 if( max_wait >= 0 ) {
992 clock_gettime( CLOCK_REALTIME, &ts );
994 if( max_wait > 999 ) {
995 seconds = max_wait / 1000;
996 max_wait -= seconds * 1000;
997 ts.tv_sec += seconds;
1000 nano_sec = max_wait * 1000000;
1001 ts.tv_nsec += nano_sec;
1002 if( ts.tv_nsec > 999999999 ) {
1003 ts.tv_nsec -= 999999999;
1008 seconds = 1; // use as flag later to invoked timed wait
1011 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1013 if( mbuf->state != RMR_OK ) {
1014 mbuf->tp_state = errno;
1015 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1021 while( chute->mbuf == NULL && ! errno ) {
1023 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1025 state = sem_wait( &chute->barrier );
1028 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1032 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1033 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1034 rmr_free_msg( chute->mbuf );
1042 return NULL; // leave errno as set by sem wait call
1046 mbuf->state = RMR_OK;
1053 Given an existing message buffer, reallocate the payload portion to
1054 be at least new_len bytes. The message header will remain such that
1055 the caller may use the rmr_rts_msg() function to return a payload
1058 The mbuf passed in may or may not be reallocated and the caller must
1059 use the returned pointer and should NOT assume that it can use the
1060 pointer passed in with the exceptions based on the clone flag.
1062 If the clone flag is set, then a duplicated message, with larger payload
1063 size, is allocated and returned. The old_msg pointer in this situation is
1064 still valid and must be explicitly freed by the application. If the clone
1065 message is not set (0), then any memory management of the old message is
1066 handled by the function.
1068 If the copy flag is set, the contents of the old message's payload is
1069 copied to the reallocated payload. If the flag is not set, then the
1070 contents of the payload is undetermined.
1072 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1073 if( old_msg == NULL ) {
1077 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1081 Enable low latency things in the transport (when supported).
1083 extern void rmr_set_low_latency( void* vctx ) {
1086 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1087 if( ctx->si_ctx != NULL ) {
1088 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1096 extern void rmr_set_fack( void* vctx ) {
1099 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1100 if( ctx->si_ctx != NULL ) {
1101 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );