1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the si version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
56 #include "si95/socket_if.h"
57 #include "si95/siproto.h"
60 #include "rmr.h" // things the users see
61 #include "rmr_agnostic.h" // agnostic things (must be included before private)
62 #include "rmr_si_private.h" // things that we need too
63 #include "rmr_symtab.h"
65 #include "ring_static.c" // message ring support
66 #include "rt_generic_static.c" // route table things not transport specific
67 #include "rtable_si_static.c" // route table things -- transport specific
68 #include "rtc_si_static.c" // specific RMR only route table collector (SI only for now)
69 #include "tools_static.c"
70 #include "sr_si_static.c" // send/receive static functions
71 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
72 #include "mt_call_static.c"
73 #include "mt_call_si_static.c"
76 //------------------------------------------------------------------------------
82 static void free_ctx( uta_ctx_t* ctx ) {
85 free( ctx->rtg_addr );
90 // --------------- public functions --------------------------------------------------------------------------
93 Returns the size of the payload (bytes) that the msg buffer references.
94 Len in a message is the number of bytes which were received, or should
95 be transmitted, however, it is possible that the mbuf was allocated
96 with a larger payload space than the payload length indicates; this
97 function returns the absolute maximum space that the user has available
98 in the payload. On error (bad msg buffer) -1 is returned and errno should
101 The allocated len stored in the msg is:
102 transport header length +
104 user requested payload
106 The msg header is a combination of the fixed RMR header and the variable
107 trace data and d2 fields which may vary for each message.
109 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
110 if( msg == NULL || msg->header == NULL ) {
116 return msg->alloc_len - RMR_HDR_LEN( msg->header ) - TP_HDR_LEN; // allocated transport size less the header and other data bits
120 Allocates a send message as a zerocopy message allowing the underlying message protocol
121 to send the buffer without copy.
123 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
127 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
131 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
137 Allocates a send message as a zerocopy message allowing the underlying message protocol
138 to send the buffer without copy. In addition, a trace data field of tr_size will be
139 added and the supplied data coppied to the buffer before returning the message to
142 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
147 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
151 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
153 state = rmr_set_trace( m, data, tr_size ); // roll their data in
154 if( state != tr_size ) {
155 m->state = RMR_ERR_INITFAILED;
163 This provides an external path to the realloc static function as it's called by an
164 outward facing mbuf api function. Used to reallocate a message with a different
167 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
168 return realloc_msg( msg, new_tr_size );
173 Return the message to the available pool, or free it outright.
175 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
176 //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
183 if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
185 free( mbuf->tp_buf );
192 This is a wrapper to the real timeout send. We must wrap it now to ensure that
193 the call flag and call-id are reset
195 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
196 char* d1; // point at the call-id in the header
199 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
201 d1 = DATA1_ADDR( msg->header );
202 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
205 return mtosend_msg( vctx, msg, max_to );
209 Send with default max timeout as is set in the context.
210 See rmr_mtosend_msg() for more details on the parameters.
211 See rmr_stimeout() for info on setting the default timeout.
213 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
214 char* d1; // point at the call-id in the header
217 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
219 d1 = DATA1_ADDR( msg->header );
220 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
223 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
227 Return to sender allows a message to be sent back to the endpoint where it originated.
229 In the SI world the file descriptor that was the source of the message is captured in
230 the mbuffer and thus can be used to quickly find the target for an RTS call.
232 The source information in the message is used to select the socket on which to write
233 the message rather than using the message type and round-robin selection. This
234 should return a message buffer with the state of the send operation set. On success
235 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
236 error it can be passed back to this function to retry the send if desired. On error,
237 errno will liklely have the failure reason set by the nng send processing.
238 The following are possible values for the state in the message buffer:
240 Message states returned:
241 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
242 RMR_ERR_NOHDR - message did not have a header
243 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
244 RMR_ERR_SENDFAILED - send failed; errno has nano error code
245 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
247 A nil message as the return value is rare, and generally indicates some kind of horrible
248 failure. The value of errno might give a clue as to what is wrong.
251 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
252 The caller must check for this and handle it properly.
254 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
255 int nn_sock; // endpoint socket for send
258 char* hold_src; // we need the original source if send fails
259 char* hold_ip; // also must hold original ip
260 int sock_ok = 0; // true if we found a valid endpoint socket
261 endpoint_t* ep = NULL; // end point to track counts
263 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
264 errno = EINVAL; // if msg is null, this is their clue
266 msg->state = RMR_ERR_BADARG;
267 msg->tp_state = errno;
272 errno = 0; // at this point any bad state is in msg returned
273 if( msg->header == NULL ) {
274 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
275 msg->state = RMR_ERR_NOHDR;
276 msg->tp_state = errno;
280 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
283 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx ); // src is always used first for rts
286 if( (nn_sock = msg->rts_fd) < 0 ) {
287 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
288 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
291 msg->state = RMR_ERR_NOENDPT;
292 return msg; // preallocated msg can be reused since not given back to nn
297 msg->state = RMR_OK; // ensure it is clear before send
298 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
299 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
300 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
301 msg = send_msg( ctx, msg, nn_sock, -1 );
304 switch( msg->state ) {
306 ep->scounts[EPSC_GOOD]++;
310 ep->scounts[EPSC_TRANS]++;
314 // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
315 ep->scounts[EPSC_FAIL]++;
319 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
320 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
321 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
330 If multi-threading call is turned on, this invokes that mechanism with the special call
331 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
332 behavour (described below) is carried out. This is safe to use when mt is enabled, but
333 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
336 On timeout this function will return a nil pointer. If the original message could not
337 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
340 Call sends the message based on message routing using the message type, and waits for a
341 response message to arrive with the same transaction id that was in the outgoing message.
342 If, while wiating for the expected response, messages are received which do not have the
343 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
344 order that they were received.
346 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
347 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
348 may be resent (likely the context pointer was nil). If the message is sent, but no
349 response is received, a nil message is returned with errno set to indicate the likley
351 ETIMEDOUT -- too many messages were queued before reciving the expected response
352 ENOBUFS -- the queued message ring is full, messages were dropped
353 EINVAL -- A parameter was not valid
354 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
355 user should call this function with the message again.
358 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
361 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
363 msg->state = RMR_ERR_BADARG;
368 return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
372 The outward facing receive function. When invoked it will pop the oldest message
373 from the receive ring, if any are queued, and return it. If the ring is empty
374 then the receive function is invoked to wait for the next message to arrive (blocking).
376 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
377 nil, a new one will be allocated. However, the caller should NOT expect to get the same
378 struct back (if a queued message is returned the message struct will be different).
380 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
382 rmr_mbuf_t* qm; // message that was queued on the ring
384 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
386 if( old_msg != NULL ) {
387 old_msg->state = RMR_ERR_BADARG;
388 old_msg->tp_state = errno;
394 return rmr_mt_rcv( ctx, old_msg, -1 );
398 This allows a timeout based receive for applications unable to implement epoll_wait()
401 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
404 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
406 if( old_msg != NULL ) {
407 old_msg->state = RMR_ERR_BADARG;
408 old_msg->tp_state = errno;
413 return rmr_mt_rcv( ctx, old_msg, ms_to );
417 This blocks until the message with the 'expect' ID is received. Messages which are received
418 before the expected message are queued onto the message ring. The function will return
419 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
420 expected message is received. If the queued message ring fills a nil pointer is returned
421 and errno is set to ENOBUFS.
423 Generally this will be invoked only by the call() function as it waits for a response, but
424 it is exposed to the user application as three is no reason not to.
426 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
428 int queued = 0; // number we pushed into the ring
429 int exp_len = 0; // length of expected ID
431 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
434 msg->state = RMR_ERR_BADARG;
435 msg->tp_state = errno;
442 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
443 return rmr_rcv_msg( ctx, msg );
446 exp_len = strlen( expect );
447 if( exp_len > RMR_MAX_XID ) {
448 exp_len = RMR_MAX_XID;
450 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
452 while( queued < allow2queue ) {
453 msg = rcv_msg( ctx, msg ); // hard wait for next
454 if( msg->state == RMR_OK ) {
455 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
456 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
460 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
461 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
466 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
472 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
478 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
479 _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
480 mechnism indicates eagain or etimeedout. All other error conditions are reported
481 without this delay. Setting a timeout of 0 causes no retries to be attempted in
482 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
483 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
484 after every 1K send attempts until the "time" value is reached. Retries are abandoned
485 if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
487 The default, if this function is not used, is 1; meaning that RMr will retry, but will
488 not enter a sleep. In all cases the caller should check the status in the message returned
491 Returns -1 if the context was invalid; RMR_OK otherwise.
493 extern int rmr_set_stimeout( void* vctx, int time ) {
496 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
504 ctx->send_retries = time;
509 Set receive timeout -- not supported in nng implementation
511 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
513 extern int rmr_set_rtimeout( void* vctx, int time ) {
514 fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
520 This is the actual init workhorse. The user visible function meerly ensures that the
521 calling programme does NOT set any internal flags that are supported, and then
522 invokes this. Internal functions (the route table collector) which need additional
523 open ports without starting additional route table collectors, will invoke this
524 directly with the proper flag.
526 static void* init( char* uproto_port, int max_msg_size, int flags ) {
527 static int announced = 0;
528 uta_ctx_t* ctx = NULL;
529 char bind_info[256]; // bind info
530 char* proto = "tcp"; // pointer into the proto/port string user supplied
532 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
534 char wbuf[1024]; // work buffer
535 char* tok; // pointer at token in a buffer
537 int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
542 fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
543 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
548 if( uproto_port == NULL ) {
549 proto_port = strdup( DEF_COMM_PORT );
551 proto_port = strdup( uproto_port ); // so we can modify it
554 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
558 memset( ctx, 0, sizeof( uta_ctx_t ) );
560 if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
561 ctx->nrivers = 256; // number of input flows we'll manage
562 ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
563 memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
564 for( i = 0; i < ctx->nrivers; i++ ) {
565 ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
568 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
569 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
570 ctx->max_ibm = max_msg_size; // default to user supplied message size
572 ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
573 ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
575 if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
576 uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
577 uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
579 fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" );
581 init_mtcall( ctx ); // set up call chutes
584 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
585 if( max_msg_size > 0 ) {
586 ctx->max_plen = max_msg_size;
589 // we're using a listener to get rtg updates, so we do NOT need this.
590 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
592 ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
593 if( ctx->si_ctx == NULL ) {
594 fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
599 if( (port = strchr( proto_port, ':' )) != NULL ) {
600 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
603 *(port++) = 0; // term proto string and point at port string
604 proto = proto_port; // user supplied proto so point at it rather than default
607 port = proto_port; // assume something like "1234" was passed
610 if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
611 if( atoi( tok ) < 1 ) {
616 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
617 tok = strdup( tok ); // something we can destroy
618 if( *tok == '[' ) { // we allow an ipv6 address here
619 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
621 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
623 if( tok2 && *tok2 ) { // if it's not the end of string marker
624 *tok2 = 0; // make it so
627 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
630 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
631 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
634 if( (tok = strchr( wbuf, '.' )) != NULL ) {
635 *tok = 0; // we don't keep domain portion
639 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
640 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
641 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
645 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
646 if( atoi( tok ) > 0 ) {
647 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
651 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
652 if( flags & RMRFL_NAME_ONLY ) {
653 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
655 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
656 if( ctx->my_ip == NULL ) {
657 fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
658 strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
661 if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
663 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
665 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
670 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
671 interface = "0.0.0.0";
674 snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
675 if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
676 fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
681 if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC
683 if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
684 fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
687 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
688 fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
693 ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
694 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
695 fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
703 Initialise the message routing environment. Flags are one of the UTAFL_
704 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
705 (tcp) to be used, then :port is all that is needed.
707 At the moment it seems that TCP really is the only viable protocol, but
708 we'll allow flexibility.
710 The return value is a void pointer which must be passed to most uta functions. On
711 error, a nil pointer is returned and errno should be set.
714 No user flags supported (needed) at the moment, but this provides for extension
715 without drastically changing anything. The user should invoke with RMRFL_NONE to
716 avoid any misbehavour as there are internal flags which are suported
718 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
719 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
723 This sets the default trace length which will be added to any message buffers
724 allocated. It can be set at any time, and if rmr_set_trace() is given a
725 trace len that is different than the default allcoated in a message, the message
728 Returns 0 on failure and 1 on success. If failure, then errno will be set.
730 extern int rmr_init_trace( void* vctx, int tr_len ) {
734 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
739 ctx->trace_data_len = tr_len;
744 Return true if routing table is initialised etc. and app can send/receive.
746 extern int rmr_ready( void* vctx ) {
749 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
753 if( ctx->rtable != NULL ) {
761 This returns the message queue ring's filedescriptor which can be used for
762 calls to epoll. The user shouild NOT read, write, or close the fd.
764 Returns the file descriptor or -1 on error.
766 extern int rmr_get_rcvfd( void* vctx ) {
770 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
775 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
776 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
781 return uta_ring_getpfd( ctx->mring );
788 There isn't an si_flush() per se, but we can pause, generate
789 a context switch, which should allow the last sent buffer to
790 flow. There isn't exactly an nng_term/close either, so there
791 isn't much we can do.
793 extern void rmr_close( void* vctx ) {
796 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
802 SItp_stats( ctx->si_ctx ); // dump some interesting stats
804 // FIX ME -- how to we turn off si; close all sessions etc?
805 //SIclose( ctx->nn_sock );
810 // ----- multi-threaded call/receive support -------------------------------------------------
813 Blocks on the receive ring chute semaphore and then reads from the ring
814 when it is tickled. If max_wait is -1 then the function blocks until
815 a message is ready on the ring. Else max_wait is assumed to be the number
816 of millaseconds to wait before returning a timeout message.
818 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
820 uta_mhdr_t* hdr; // header in the transport buffer
822 struct timespec ts; // time info if we have a timeout
823 long new_ms; // adjusted mu-sec
824 long seconds = 0; // max wait seconds
825 long nano_sec; // max wait xlated to nano seconds
827 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
829 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
832 mbuf->state = RMR_ERR_BADARG;
833 mbuf->tp_state = errno;
838 ombuf = mbuf; // if we timeout we must return original msg with status, so save it
840 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
842 if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
843 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
845 rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
848 mbuf = ombuf; // return original if it was given with timeout status
849 if( ombuf != NULL ) {
850 mbuf->state = RMR_ERR_TIMEOUT; // preset if for failure
859 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
863 clock_gettime( CLOCK_REALTIME, &ts ); // sem timeout based on clock, not a delta
865 if( max_wait > 999 ) {
866 seconds = max_wait / 1000;
867 max_wait -= seconds * 1000;
868 ts.tv_sec += seconds;
871 nano_sec = max_wait * 1000000;
872 ts.tv_nsec += nano_sec;
873 if( ts.tv_nsec > 999999999 ) {
874 ts.tv_nsec -= 999999999;
879 seconds = 1; // use as flag later to invoked timed wait
884 while( state < 0 && errno == EINTR ) {
886 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
888 state = sem_wait( &chute->barrier );
893 mbuf = ombuf; // return caller's buffer if they passed one in
895 errno = 0; // interrupted call state could be left; clear
896 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
897 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
898 mbuf->state = RMR_OK;
901 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
905 mbuf = ombuf; // no buffer, return user's if there
910 mbuf->tp_state = errno;
916 Accept a message buffer and caller ID, send the message and then wait
917 for the receiver to tickle the semaphore letting us know that a message
918 has been received. The call_id is a value between 2 and 255, inclusive; if
919 it's not in this range an error will be returned. Max wait is the amount
920 of time in millaseconds that the call should block for. If 0 is given
921 then no timeout is set.
923 If the mt_call feature has not been initialised, then the attempt to use this
924 funciton will fail with RMR_ERR_NOTSUPP
926 If no matching message is received before the max_wait period expires, a
927 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
928 occurs after the message has been sent, then a nil pointer is returned
929 with errno set to some other value.
931 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
932 rmr_mbuf_t* ombuf; // original mbuf passed in
934 uta_mhdr_t* hdr; // header in the transport buffer
936 unsigned char* d1; // d1 data in header
937 struct timespec ts; // time info if we have a timeout
938 long new_ms; // adjusted mu-sec
939 long seconds = 0; // max wait seconds
940 long nano_sec; // max wait xlated to nano seconds
944 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
946 mbuf->tp_state = errno;
947 mbuf->state = RMR_ERR_BADARG;
952 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
953 mbuf->state = RMR_ERR_NOTSUPP;
954 mbuf->tp_state = errno;
958 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
959 mbuf->state = RMR_ERR_BADARG;
960 mbuf->tp_state = errno;
964 ombuf = mbuf; // save to return timeout status with
966 chute = &ctx->chutes[call_id];
967 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
968 rmr_free_msg( chute->mbuf );
972 hdr = (uta_mhdr_t *) mbuf->header;
973 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
974 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
975 d1 = DATA1_ADDR( hdr );
976 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
977 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
979 if( max_wait >= 0 ) {
980 clock_gettime( CLOCK_REALTIME, &ts );
982 if( max_wait > 999 ) {
983 seconds = max_wait / 1000;
984 max_wait -= seconds * 1000;
985 ts.tv_sec += seconds;
988 nano_sec = max_wait * 1000000;
989 ts.tv_nsec += nano_sec;
990 if( ts.tv_nsec > 999999999 ) {
991 ts.tv_nsec -= 999999999;
996 seconds = 1; // use as flag later to invoked timed wait
999 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1001 if( mbuf->state != RMR_OK ) {
1002 mbuf->tp_state = errno;
1003 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1009 while( chute->mbuf == NULL && ! errno ) {
1011 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1013 state = sem_wait( &chute->barrier );
1016 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1020 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1021 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1022 rmr_free_msg( chute->mbuf );
1030 return NULL; // leave errno as set by sem wait call
1034 mbuf->state = RMR_OK;
1041 Given an existing message buffer, reallocate the payload portion to
1042 be at least new_len bytes. The message header will remain such that
1043 the caller may use the rmr_rts_msg() function to return a payload
1046 The mbuf passed in may or may not be reallocated and the caller must
1047 use the returned pointer and should NOT assume that it can use the
1048 pointer passed in with the exceptions based on the clone flag.
1050 If the clone flag is set, then a duplicated message, with larger payload
1051 size, is allocated and returned. The old_msg pointer in this situation is
1052 still valid and must be explicitly freed by the application. If the clone
1053 message is not set (0), then any memory management of the old message is
1054 handled by the function.
1056 If the copy flag is set, the contents of the old message's payload is
1057 copied to the reallocated payload. If the flag is not set, then the
1058 contents of the payload is undetermined.
1060 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1061 if( old_msg == NULL ) {
1065 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1069 Enable low latency things in the transport (when supported).
1071 extern void rmr_set_low_latency( void* vctx ) {
1074 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1075 if( ctx->si_ctx != NULL ) {
1076 SIset_tflags( ctx->si_ctx, SI_TF_NODELAY );
1084 extern void rmr_set_fack( void* vctx ) {
1087 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
1088 if( ctx->si_ctx != NULL ) {
1089 SIset_tflags( ctx->si_ctx, SI_TF_FASTACK );