1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the nng version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
63 #include "rmr.h" // things the users see
64 #include "rmr_agnostic.h" // agnostic things (must be included before private)
65 #include "rmr_nng_private.h" // things that we need too
66 #include "rmr_symtab.h"
68 #include "ring_static.c" // message ring support
69 #include "rt_generic_static.c" // route table things not transport specific
70 #include "rtable_nng_static.c" // route table things -- transport specific
71 #include "rtc_static.c" // route table collector
72 #include "tools_static.c"
73 #include "sr_nng_static.c" // send/receive static functions
74 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
75 #include "mt_call_static.c"
76 #include "mt_call_nng_static.c"
79 //------------------------------------------------------------------------------
85 static void free_ctx( uta_ctx_t* ctx ) {
88 free( ctx->rtg_addr );
93 // --------------- public functions --------------------------------------------------------------------------
96 Returns the size of the payload (bytes) that the msg buffer references.
97 Len in a message is the number of bytes which were received, or should
98 be transmitted, however, it is possible that the mbuf was allocated
99 with a larger payload space than the payload length indicates; this
100 function returns the absolute maximum space that the user has available
101 in the payload. On error (bad msg buffer) -1 is returned and errno should
104 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
105 if( msg == NULL || msg->header == NULL ) {
111 return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits
115 Allocates a send message as a zerocopy message allowing the underlying message protocol
116 to send the buffer without copy.
118 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
122 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
126 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
132 Allocates a send message as a zerocopy message allowing the underlying message protocol
133 to send the buffer without copy. In addition, a trace data field of tr_size will be
134 added and the supplied data coppied to the buffer before returning the message to
137 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
142 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
146 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
148 state = rmr_set_trace( m, data, tr_size ); // roll their data in
149 if( state != tr_size ) {
150 m->state = RMR_ERR_INITFAILED;
158 This provides an external path to the realloc static function as it's called by an
159 outward facing mbuf api function. Used to reallocate a message with a different
162 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
163 return realloc_msg( msg, new_tr_size );
168 Return the message to the available pool, or free it outright.
170 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
176 if( mbuf->flags & MFL_ZEROCOPY ) {
177 //nng_free( (void *) mbuf->header, mbuf->alloc_len );
179 nng_msg_free( mbuf->tp_buf );
188 This is a wrapper to the real timeout send. We must wrap it now to ensure that
189 the call flag and call-id are reset
191 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
192 char* d1; // point at the call-id in the header
195 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
197 d1 = DATA1_ADDR( msg->header );
198 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
201 return mtosend_msg( vctx, msg, max_to );
205 Send with default max timeout as is set in the context.
206 See rmr_mtosend_msg() for more details on the parameters.
207 See rmr_stimeout() for info on setting the default timeout.
209 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
210 char* d1; // point at the call-id in the header
213 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
215 d1 = DATA1_ADDR( msg->header );
216 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
219 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
223 Return to sender allows a message to be sent back to the endpoint where it originated.
224 The source information in the message is used to select the socket on which to write
225 the message rather than using the message type and round-robin selection. This
226 should return a message buffer with the state of the send operation set. On success
227 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
228 error it can be passed back to this function to retry the send if desired. On error,
229 errno will liklely have the failure reason set by the nng send processing.
230 The following are possible values for the state in the message buffer:
232 Message states returned:
233 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
234 RMR_ERR_NOHDR - message did not have a header
235 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
236 RMR_ERR_SENDFAILED - send failed; errno has nano error code
237 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
239 A nil message as the return value is rare, and generally indicates some kind of horrible
240 failure. The value of errno might give a clue as to what is wrong.
243 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
244 The caller must check for this and handle.
246 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
247 nng_socket nn_sock; // endpoint socket for send
250 char* hold_src; // we need the original source if send fails
251 char* hold_ip; // also must hold original ip
252 int sock_ok = 0; // true if we found a valid endpoint socket
253 endpoint_t* ep; // end point to track counts
255 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
256 errno = EINVAL; // if msg is null, this is their clue
258 msg->state = RMR_ERR_BADARG;
259 msg->tp_state = errno;
264 errno = 0; // at this point any bad state is in msg returned
265 if( msg->header == NULL ) {
266 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
267 msg->state = RMR_ERR_NOHDR;
268 msg->tp_state = errno;
272 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
274 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts
276 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
277 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
280 msg->state = RMR_ERR_NOENDPT;
281 return msg; // preallocated msg can be reused since not given back to nn
285 msg->state = RMR_OK; // ensure it is clear before send
286 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
287 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
288 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
289 msg = send_msg( ctx, msg, nn_sock, -1 );
292 switch( msg->state ) {
294 ep->scounts[EPSC_GOOD]++;
298 ep->scounts[EPSC_TRANS]++;
302 ep->scounts[EPSC_FAIL]++;
306 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
307 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
308 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
317 If multi-threading call is turned on, this invokes that mechanism with the special call
318 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
319 behavour (described below) is carried out. This is safe to use when mt is enabled, but
320 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
323 On timeout this function will return a nil pointer. If the original message could not
324 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
327 Call sends the message based on message routing using the message type, and waits for a
328 response message to arrive with the same transaction id that was in the outgoing message.
329 If, while wiating for the expected response, messages are received which do not have the
330 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
331 order that they were received.
333 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
334 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
335 may be resent (likely the context pointer was nil). If the message is sent, but no
336 response is received, a nil message is returned with errno set to indicate the likley
338 ETIMEDOUT -- too many messages were queued before reciving the expected response
339 ENOBUFS -- the queued message ring is full, messages were dropped
340 EINVAL -- A parameter was not valid
341 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
342 user should call this function with the message again.
345 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
347 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
349 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
351 msg->state = RMR_ERR_BADARG;
356 if( ctx->flags & CFL_MTC_ENABLED ) { // if multi threaded call is on, use that
357 return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
360 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
361 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
362 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
364 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
366 msg = rmr_send_msg( ctx, msg );
367 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
368 if( msg->state != RMR_ERR_RETRY ) {
369 msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry
371 msg->tp_state = errno;
375 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
379 The outward facing receive function. When invoked it will pop the oldest message
380 from the receive ring, if any are queued, and return it. If the ring is empty
381 then the receive function is invoked to wait for the next message to arrive (blocking).
383 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
384 nil, a new one will be allocated. However, the caller should NOT expect to get the same
385 struct back (if a queued message is returned the message struct will be different).
387 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
389 rmr_mbuf_t* qm; // message that was queued on the ring
391 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
393 if( old_msg != NULL ) {
394 old_msg->state = RMR_ERR_BADARG;
395 old_msg->tp_state = errno;
401 if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first
402 return rmr_mt_rcv( ctx, old_msg, -1 );
405 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
408 rmr_free_msg( old_msg ); // future: push onto a free list???
414 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
418 This implements a receive with a timeout via epoll. Mostly this is for
419 wrappers as native C applications can use epoll directly and will not have
422 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
423 struct epoll_stuff* eps; // convience pointer
425 rmr_mbuf_t* qm; // message that was queued on the ring
429 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
431 if( old_msg != NULL ) {
432 old_msg->state = RMR_ERR_BADARG;
433 old_msg->tp_state = errno;
438 if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first
439 return rmr_mt_rcv( ctx, old_msg, ms_to );
442 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
445 rmr_free_msg( old_msg ); // future: push onto a free list???
451 if( (eps = ctx->eps) == NULL ) { // set up epoll on first call
452 eps = malloc( sizeof *eps );
454 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
455 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
460 eps->nng_fd = rmr_get_rcvfd( ctx );
461 eps->epe.events = EPOLLIN;
462 eps->epe.data.fd = eps->nng_fd;
464 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
465 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
476 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
483 nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout
484 if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours
485 msg->state = RMR_ERR_TIMEOUT;
486 msg->tp_state = errno;
488 return rcv_msg( ctx, msg ); // receive it and return it
491 return msg; // return empty message with state set
495 This blocks until the message with the 'expect' ID is received. Messages which are received
496 before the expected message are queued onto the message ring. The function will return
497 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
498 expected message is received. If the queued message ring fills a nil pointer is returned
499 and errno is set to ENOBUFS.
501 Generally this will be invoked only by the call() function as it waits for a response, but
502 it is exposed to the user application as three is no reason not to.
504 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
506 int queued = 0; // number we pushed into the ring
507 int exp_len = 0; // length of expected ID
509 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
512 msg->state = RMR_ERR_BADARG;
513 msg->tp_state = errno;
520 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
521 return rmr_rcv_msg( ctx, msg );
524 exp_len = strlen( expect );
525 if( exp_len > RMR_MAX_XID ) {
526 exp_len = RMR_MAX_XID;
528 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
530 while( queued < allow2queue ) {
531 msg = rcv_msg( ctx, msg ); // hard wait for next
532 if( msg->state == RMR_OK ) {
533 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
534 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
538 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
539 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
544 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
550 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
556 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
557 _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
558 mechnism indicates eagain or etimeedout. All other error conditions are reported
559 without this delay. Setting a timeout of 0 causes no retries to be attempted in
560 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
561 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
562 after every 1K send attempts until the "time" value is reached. Retries are abandoned
563 if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
565 The default, if this function is not used, is 1; meaning that RMr will retry, but will
566 not enter a sleep. In all cases the caller should check the status in the message returned
569 Returns -1 if the context was invalid; RMR_OK otherwise.
571 extern int rmr_set_stimeout( void* vctx, int time ) {
574 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
582 ctx->send_retries = time;
587 Set receive timeout -- not supported in nng implementation
589 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
591 extern int rmr_set_rtimeout( void* vctx, int time ) {
592 fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
598 This is the actual init workhorse. The user visible function meerly ensures that the
599 calling programme does NOT set any internal flags that are supported, and then
600 invokes this. Internal functions (the route table collector) which need additional
601 open ports without starting additional route table collectors, will invoke this
602 directly with the proper flag.
604 static void* init( char* uproto_port, int max_msg_size, int flags ) {
605 static int announced = 0;
606 uta_ctx_t* ctx = NULL;
607 char bind_info[NNG_MAXADDRLEN]; // bind info
608 char* proto = "tcp"; // pointer into the proto/port string user supplied
610 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
612 char wbuf[1024]; // work buffer
613 char* tok; // pointer at token in a buffer
618 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
619 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
624 if( uproto_port == NULL ) {
625 proto_port = strdup( DEF_COMM_PORT );
627 proto_port = strdup( uproto_port ); // so we can modify it
630 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
634 memset( ctx, 0, sizeof( uta_ctx_t ) );
636 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
637 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
639 if( flags & RMRFL_MTCALL ) { // mt call support is on, need bigger ring
640 ctx->mring = uta_mk_ring( 2048 ); // message ring filled by rcv thread
641 init_mtcall( ctx ); // set up call chutes
643 ctx->mring = uta_mk_ring( 128 ); // ring filled only on blocking call
646 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
647 if( max_msg_size > 0 ) {
648 ctx->max_plen = max_msg_size;
651 // we're using a listener to get rtg updates, so we do NOT need this.
652 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
654 if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode
655 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
660 if( (port = strchr( proto_port, ':' )) != NULL ) {
661 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
664 *(port++) = 0; // term proto string and point at port string
665 proto = proto_port; // user supplied proto so point at it rather than default
668 port = proto_port; // assume something like "1234" was passed
671 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
672 tok = strdup( tok ); // something we can destroy
673 if( *tok == '[' ) { // we allow an ipv6 address here
674 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
676 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
678 if( tok2 && *tok2 ) { // if it's not the end of string marker
679 *tok2 = 0; // make it so
682 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
685 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
686 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
689 if( (tok = strchr( wbuf, '.' )) != NULL ) {
690 *tok = 0; // we don't keep domain portion
694 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
695 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
696 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
700 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
701 if( atoi( tok ) > 0 ) {
702 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
706 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
707 if( flags & RMRFL_NAME_ONLY ) {
708 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
710 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
711 if( ctx->my_ip == NULL ) {
712 fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
713 strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
716 if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
718 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
720 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
725 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
726 interface = "0.0.0.0";
728 // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
729 // rather than using this generic listen() call.
730 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
731 if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
732 fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
733 nng_close( ctx->nn_sock );
738 if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
739 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
740 fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
744 if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running
745 ctx->flags |= CFL_MTC_ENABLED;
746 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver
747 fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
757 Initialise the message routing environment. Flags are one of the UTAFL_
758 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
759 (tcp) to be used, then :port is all that is needed.
761 At the moment it seems that TCP really is the only viable protocol, but
762 we'll allow flexibility.
764 The return value is a void pointer which must be passed to most uta functions. On
765 error, a nil pointer is returned and errno should be set.
768 No user flags supported (needed) at the moment, but this provides for extension
769 without drastically changing anything. The user should invoke with RMRFL_NONE to
770 avoid any misbehavour as there are internal flags which are suported
772 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
773 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
777 This sets the default trace length which will be added to any message buffers
778 allocated. It can be set at any time, and if rmr_set_trace() is given a
779 trace len that is different than the default allcoated in a message, the message
782 Returns 0 on failure and 1 on success. If failure, then errno will be set.
784 extern int rmr_init_trace( void* vctx, int tr_len ) {
788 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
793 ctx->trace_data_len = tr_len;
798 Return true if routing table is initialised etc. and app can send/receive.
800 extern int rmr_ready( void* vctx ) {
803 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
807 if( ctx->rtable != NULL ) {
815 Returns a file descriptor which can be used with epoll() to signal a receive
816 pending. The file descriptor should NOT be read from directly, nor closed, as NNG
817 does not support this.
819 extern int rmr_get_rcvfd( void* vctx ) {
824 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
828 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
829 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
840 There isn't an nng_flush() per se, but we can pause, generate
841 a context switch, which should allow the last sent buffer to
842 flow. There isn't exactly an nng_term/close either, so there
843 isn't much we can do.
845 extern void rmr_close( void* vctx ) {
848 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
853 nng_close( ctx->nn_sock );
857 // ----- multi-threaded call/receive support -------------------------------------------------
860 Blocks on the receive ring chute semaphore and then reads from the ring
861 when it is tickled. If max_wait is -1 then the function blocks until
862 a message is ready on the ring. Else max_wait is assumed to be the number
863 of millaseconds to wait before returning a timeout message.
865 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
867 uta_mhdr_t* hdr; // header in the transport buffer
869 struct timespec ts; // time info if we have a timeout
870 long new_ms; // adjusted mu-sec
871 long seconds = 0; // max wait seconds
872 long nano_sec; // max wait xlated to nano seconds
874 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
876 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
879 mbuf->state = RMR_ERR_BADARG;
880 mbuf->tp_state = errno;
885 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
888 mbuf->state = RMR_ERR_NOTSUPP;
889 mbuf->tp_state = errno;
896 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
900 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
902 if( max_wait >= 0 ) {
903 clock_gettime( CLOCK_REALTIME, &ts );
905 if( max_wait > 999 ) {
906 seconds = max_wait / 1000;
907 max_wait -= seconds * 1000;
908 ts.tv_sec += seconds;
911 nano_sec = max_wait * 1000000;
912 ts.tv_nsec += nano_sec;
913 if( ts.tv_nsec > 999999999 ) {
914 ts.tv_nsec -= 999999999;
919 seconds = 1; // use as flag later to invoked timed wait
924 while( state < 0 && errno == EINTR ) {
926 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
928 state = sem_wait( &chute->barrier );
933 mbuf = ombuf; // return caller's buffer if they passed one in
935 errno = 0; // interrupted call state could be left; clear
936 if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
937 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
938 mbuf->state = RMR_OK;
941 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
945 mbuf = ombuf; // no buffer, return user's if there
950 mbuf->tp_state = errno;
956 Accept a message buffer and caller ID, send the message and then wait
957 for the receiver to tickle the semaphore letting us know that a message
958 has been received. The call_id is a value between 2 and 255, inclusive; if
959 it's not in this range an error will be returned. Max wait is the amount
960 of time in millaseconds that the call should block for. If 0 is given
961 then no timeout is set.
963 If the mt_call feature has not been initialised, then the attempt to use this
964 funciton will fail with RMR_ERR_NOTSUPP
966 If no matching message is received before the max_wait period expires, a
967 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
968 occurs after the message has been sent, then a nil pointer is returned
969 with errno set to some other value.
971 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
972 rmr_mbuf_t* ombuf; // original mbuf passed in
974 uta_mhdr_t* hdr; // header in the transport buffer
976 unsigned char* d1; // d1 data in header
977 struct timespec ts; // time info if we have a timeout
978 long new_ms; // adjusted mu-sec
979 long seconds = 0; // max wait seconds
980 long nano_sec; // max wait xlated to nano seconds
984 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
986 mbuf->tp_state = errno;
987 mbuf->state = RMR_ERR_BADARG;
992 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
993 mbuf->state = RMR_ERR_NOTSUPP;
994 mbuf->tp_state = errno;
998 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
999 mbuf->state = RMR_ERR_BADARG;
1000 mbuf->tp_state = errno;
1004 ombuf = mbuf; // save to return timeout status with
1006 chute = &ctx->chutes[call_id];
1007 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
1008 rmr_free_msg( chute->mbuf );
1012 hdr = (uta_mhdr_t *) mbuf->header;
1013 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
1014 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
1015 d1 = DATA1_ADDR( hdr );
1016 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
1017 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1019 if( max_wait >= 0 ) {
1020 clock_gettime( CLOCK_REALTIME, &ts );
1022 if( max_wait > 999 ) {
1023 seconds = max_wait / 1000;
1024 max_wait -= seconds * 1000;
1025 ts.tv_sec += seconds;
1027 if( max_wait > 0 ) {
1028 nano_sec = max_wait * 1000000;
1029 ts.tv_nsec += nano_sec;
1030 if( ts.tv_nsec > 999999999 ) {
1031 ts.tv_nsec -= 999999999;
1036 seconds = 1; // use as flag later to invoked timed wait
1039 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1041 if( mbuf->state != RMR_OK ) {
1042 mbuf->tp_state = errno;
1043 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1049 while( chute->mbuf == NULL && ! errno ) {
1051 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1053 state = sem_wait( &chute->barrier );
1056 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1060 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1061 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1062 rmr_free_msg( chute->mbuf );
1070 return NULL; // leave errno as set by sem wait call
1074 mbuf->state = RMR_OK;