1 // vim: ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019-2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the nng version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
53 #include <semaphore.h>
57 #include <nng/protocol/pubsub0/pub.h>
58 #include <nng/protocol/pubsub0/sub.h>
59 #include <nng/protocol/pipeline0/push.h>
60 #include <nng/protocol/pipeline0/pull.h>
63 #include "rmr.h" // things the users see
64 #include "rmr_agnostic.h" // agnostic things (must be included before private)
65 #include "rmr_nng_private.h" // things that we need too
66 #include "rmr_symtab.h"
67 #include "rmr_logging.h"
69 #include "ring_static.c" // message ring support
70 #include "rt_generic_static.c" // route table things not transport specific
71 #include "rtable_nng_static.c" // route table things -- transport specific
72 #include "rtc_static.c" // route table collector
73 #include "tools_static.c"
74 #include "sr_nng_static.c" // send/receive static functions
75 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
76 #include "mt_call_static.c"
77 #include "mt_call_nng_static.c"
80 //------------------------------------------------------------------------------
86 static void free_ctx( uta_ctx_t* ctx ) {
89 free( ctx->rtg_addr );
94 // --------------- public functions --------------------------------------------------------------------------
97 Returns the size of the payload (bytes) that the msg buffer references.
98 Len in a message is the number of bytes which were received, or should
99 be transmitted, however, it is possible that the mbuf was allocated
100 with a larger payload space than the payload length indicates; this
101 function returns the absolute maximum space that the user has available
102 in the payload. On error (bad msg buffer) -1 is returned and errno should
105 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
106 if( msg == NULL || msg->header == NULL ) {
112 return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits
116 Allocates a send message as a zerocopy message allowing the underlying message protocol
117 to send the buffer without copy.
119 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
123 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
127 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
133 Allocates a send message as a zerocopy message allowing the underlying message protocol
134 to send the buffer without copy. In addition, a trace data field of tr_size will be
135 added and the supplied data coppied to the buffer before returning the message to
138 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
143 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
147 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
149 state = rmr_set_trace( m, data, tr_size ); // roll their data in
150 if( state != tr_size ) {
151 m->state = RMR_ERR_INITFAILED;
159 This provides an external path to the realloc static function as it's called by an
160 outward facing mbuf api function. Used to reallocate a message with a different
163 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
164 return realloc_msg( msg, new_tr_size );
169 Return the message to the available pool, or free it outright.
171 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
177 if( mbuf->flags & MFL_ZEROCOPY ) {
178 //nng_free( (void *) mbuf->header, mbuf->alloc_len );
180 nng_msg_free( mbuf->tp_buf );
189 This is a wrapper to the real timeout send. We must wrap it now to ensure that
190 the call flag and call-id are reset
192 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
193 char* d1; // point at the call-id in the header
196 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
198 d1 = DATA1_ADDR( msg->header );
199 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
202 return mtosend_msg( vctx, msg, max_to );
206 Send with default max timeout as is set in the context.
207 See rmr_mtosend_msg() for more details on the parameters.
208 See rmr_stimeout() for info on setting the default timeout.
210 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
211 char* d1; // point at the call-id in the header
214 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
216 d1 = DATA1_ADDR( msg->header );
217 d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
220 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
224 Return to sender allows a message to be sent back to the endpoint where it originated.
225 The source information in the message is used to select the socket on which to write
226 the message rather than using the message type and round-robin selection. This
227 should return a message buffer with the state of the send operation set. On success
228 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
229 error it can be passed back to this function to retry the send if desired. On error,
230 errno will liklely have the failure reason set by the nng send processing.
231 The following are possible values for the state in the message buffer:
233 Message states returned:
234 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
235 RMR_ERR_NOHDR - message did not have a header
236 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
237 RMR_ERR_SENDFAILED - send failed; errno has nano error code
238 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
240 A nil message as the return value is rare, and generally indicates some kind of horrible
241 failure. The value of errno might give a clue as to what is wrong.
244 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
245 The caller must check for this and handle.
247 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
248 nng_socket nn_sock; // endpoint socket for send
251 char* hold_src; // we need the original source if send fails
252 char* hold_ip; // also must hold original ip
253 int sock_ok = 0; // true if we found a valid endpoint socket
254 endpoint_t* ep; // end point to track counts
256 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
257 errno = EINVAL; // if msg is null, this is their clue
259 msg->state = RMR_ERR_BADARG;
260 msg->tp_state = errno;
265 errno = 0; // at this point any bad state is in msg returned
266 if( msg->header == NULL ) {
267 rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
268 msg->state = RMR_ERR_NOHDR;
269 msg->tp_state = errno;
273 ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
275 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts
277 if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
278 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
281 msg->state = RMR_ERR_NOENDPT;
282 return msg; // preallocated msg can be reused since not given back to nn
286 msg->state = RMR_OK; // ensure it is clear before send
287 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
288 hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
289 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
290 msg = send_msg( ctx, msg, nn_sock, -1 );
293 switch( msg->state ) {
295 ep->scounts[EPSC_GOOD]++;
299 ep->scounts[EPSC_TRANS]++;
303 ep->scounts[EPSC_FAIL]++;
307 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
308 strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
309 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
318 If multi-threading call is turned on, this invokes that mechanism with the special call
319 id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
320 behavour (described below) is carried out. This is safe to use when mt is enabled, but
321 the user app is invoking rmr_call() from only one thread, and the caller doesn't need
324 On timeout this function will return a nil pointer. If the original message could not
325 be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
328 Call sends the message based on message routing using the message type, and waits for a
329 response message to arrive with the same transaction id that was in the outgoing message.
330 If, while wiating for the expected response, messages are received which do not have the
331 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
332 order that they were received.
334 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
335 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
336 may be resent (likely the context pointer was nil). If the message is sent, but no
337 response is received, a nil message is returned with errno set to indicate the likley
339 ETIMEDOUT -- too many messages were queued before reciving the expected response
340 ENOBUFS -- the queued message ring is full, messages were dropped
341 EINVAL -- A parameter was not valid
342 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
343 user should call this function with the message again.
346 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
348 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
350 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
352 msg->state = RMR_ERR_BADARG;
357 if( ctx->flags & CFL_MTC_ENABLED ) { // if multi threaded call is on, use that
358 return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
361 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
362 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
363 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id );
365 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
367 msg = rmr_send_msg( ctx, msg );
368 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
369 if( msg->state != RMR_ERR_RETRY ) {
370 msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry
372 msg->tp_state = errno;
376 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
380 The outward facing receive function. When invoked it will pop the oldest message
381 from the receive ring, if any are queued, and return it. If the ring is empty
382 then the receive function is invoked to wait for the next message to arrive (blocking).
384 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
385 nil, a new one will be allocated. However, the caller should NOT expect to get the same
386 struct back (if a queued message is returned the message struct will be different).
388 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
390 rmr_mbuf_t* qm; // message that was queued on the ring
392 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
394 if( old_msg != NULL ) {
395 old_msg->state = RMR_ERR_BADARG;
396 old_msg->tp_state = errno;
402 if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first
403 return rmr_mt_rcv( ctx, old_msg, -1 );
406 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
409 rmr_free_msg( old_msg ); // future: push onto a free list???
415 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
419 This implements a receive with a timeout via epoll. Mostly this is for
420 wrappers as native C applications can use epoll directly and will not have
423 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
424 struct epoll_stuff* eps; // convience pointer
426 rmr_mbuf_t* qm; // message that was queued on the ring
430 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
432 if( old_msg != NULL ) {
433 old_msg->state = RMR_ERR_BADARG;
434 old_msg->tp_state = errno;
439 if( ctx->flags & CFL_MTC_ENABLED ) { // must pop from ring with a semaphore dec first
440 return rmr_mt_rcv( ctx, old_msg, ms_to );
443 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
446 rmr_free_msg( old_msg ); // future: push onto a free list???
452 if( (eps = ctx->eps) == NULL ) { // set up epoll on first call
453 eps = malloc( sizeof *eps );
455 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
456 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
459 if( old_msg != NULL ) {
460 old_msg->state = RMR_ERR_INITFAILED;
461 old_msg->tp_state = errno;
466 eps->nng_fd = rmr_get_rcvfd( ctx );
467 eps->epe.events = EPOLLIN;
468 eps->epe.data.fd = eps->nng_fd;
470 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
471 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
474 if( old_msg != NULL ) {
475 old_msg->state = RMR_ERR_INITFAILED;
476 old_msg->tp_state = errno;
487 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
494 nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout
495 if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours
496 msg->state = RMR_ERR_TIMEOUT;
497 msg->tp_state = errno;
499 return rcv_msg( ctx, msg ); // receive it and return it
502 return msg; // return empty message with state set
506 This blocks until the message with the 'expect' ID is received. Messages which are received
507 before the expected message are queued onto the message ring. The function will return
508 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
509 expected message is received. If the queued message ring fills a nil pointer is returned
510 and errno is set to ENOBUFS.
512 Generally this will be invoked only by the call() function as it waits for a response, but
513 it is exposed to the user application as three is no reason not to.
515 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
517 int queued = 0; // number we pushed into the ring
518 int exp_len = 0; // length of expected ID
520 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
523 msg->state = RMR_ERR_BADARG;
524 msg->tp_state = errno;
531 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
532 return rmr_rcv_msg( ctx, msg );
535 exp_len = strlen( expect );
536 if( exp_len > RMR_MAX_XID ) {
537 exp_len = RMR_MAX_XID;
539 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n", expect );
541 while( queued < allow2queue ) {
542 msg = rcv_msg( ctx, msg ); // hard wait for next
543 if( msg->state == RMR_OK ) {
544 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
545 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
549 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
550 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" );
555 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype );
561 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect );
567 Set send timeout. The value time is assumed to be milliseconds. The timeout is the
568 _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
569 mechnism indicates eagain or etimeedout. All other error conditions are reported
570 without this delay. Setting a timeout of 0 causes no retries to be attempted in
571 RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
572 but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
573 after every 1K send attempts until the "time" value is reached. Retries are abandoned
574 if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
576 The default, if this function is not used, is 1; meaning that RMr will retry, but will
577 not enter a sleep. In all cases the caller should check the status in the message returned
580 Returns -1 if the context was invalid; RMR_OK otherwise.
582 extern int rmr_set_stimeout( void* vctx, int time ) {
585 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
593 ctx->send_retries = time;
598 Set receive timeout -- not supported in nng implementation
600 CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
602 extern int rmr_set_rtimeout( void* vctx, int time ) {
603 rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
609 This is the actual init workhorse. The user visible function meerly ensures that the
610 calling programme does NOT set any internal flags that are supported, and then
611 invokes this. Internal functions (the route table collector) which need additional
612 open ports without starting additional route table collectors, will invoke this
613 directly with the proper flag.
615 static void* init( char* uproto_port, int max_msg_size, int flags ) {
616 static int announced = 0;
617 uta_ctx_t* ctx = NULL;
618 char bind_info[NNG_MAXADDRLEN]; // bind info
619 char* proto = "tcp"; // pointer into the proto/port string user supplied
621 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
623 char wbuf[1024]; // work buffer
624 char* tok; // pointer at token in a buffer
629 old_vlevel = rmr_vlog_init(); // initialise and get the current level
630 rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
633 rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
634 RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
637 rmr_set_vlevel( old_vlevel ); // return logging to the desired state
640 if( uproto_port == NULL ) {
641 proto_port = strdup( DEF_COMM_PORT );
643 proto_port = strdup( uproto_port ); // so we can modify it
646 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
650 memset( ctx, 0, sizeof( uta_ctx_t ) );
652 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
653 ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
655 if( flags & RMRFL_MTCALL ) { // mt call support is on, need bigger ring
656 ctx->mring = uta_mk_ring( 2048 ); // message ring filled by rcv thread
657 init_mtcall( ctx ); // set up call chutes
659 ctx->mring = uta_mk_ring( 128 ); // ring filled only on blocking call
662 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
663 if( max_msg_size > 0 ) {
664 ctx->max_plen = max_msg_size;
667 // we're using a listener to get rtg updates, so we do NOT need this.
668 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
670 if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode
671 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
676 if( (port = strchr( proto_port, ':' )) != NULL ) {
677 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
680 *(port++) = 0; // term proto string and point at port string
681 proto = proto_port; // user supplied proto so point at it rather than default
684 port = proto_port; // assume something like "1234" was passed
687 if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
688 tok = strdup( tok ); // something we can destroy
689 if( *tok == '[' ) { // we allow an ipv6 address here
690 tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
692 tok2 = strchr( tok, ':' ); // find :port if there so we can chop
694 if( tok2 && *tok2 ) { // if it's not the end of string marker
695 *tok2 = 0; // make it so
698 snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
701 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
702 rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
705 if( (tok = strchr( wbuf, '.' )) != NULL ) {
706 *tok = 0; // we don't keep domain portion
710 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
711 if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
712 rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
716 if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
717 if( atoi( tok ) > 0 ) {
718 flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
722 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
723 if( flags & RMRFL_NAME_ONLY ) {
724 ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
726 ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
727 if( ctx->my_ip == NULL ) {
728 rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
729 strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
732 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
734 if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
736 ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
741 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
742 interface = "0.0.0.0";
744 // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
745 // rather than using this generic listen() call.
746 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
747 if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
748 rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
749 nng_close( ctx->nn_sock );
754 if( flags & FL_NOTHREAD ) { // if no rtc thread, we still need an empty route table for wormholes
755 ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // so create one
757 if( (tok = getenv( ENV_RTG_RAW )) != NULL && *tok == '0' ) { // use RMR for Rmgr comm only when specifically off
758 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rmr based rt collector thread
759 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
762 if( pthread_create( &ctx->rtc_th, NULL, raw_rtc, (void *) ctx ) ) { // kick the raw msg rt collector thread
763 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
768 if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running
769 ctx->flags |= CFL_MTC_ENABLED;
770 if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver
771 rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
781 Initialise the message routing environment. Flags are one of the UTAFL_
782 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
783 (tcp) to be used, then :port is all that is needed.
785 At the moment it seems that TCP really is the only viable protocol, but
786 we'll allow flexibility.
788 The return value is a void pointer which must be passed to most uta functions. On
789 error, a nil pointer is returned and errno should be set.
792 No user flags supported (needed) at the moment, but this provides for extension
793 without drastically changing anything. The user should invoke with RMRFL_NONE to
794 avoid any misbehavour as there are internal flags which are suported
796 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
797 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
801 This sets the default trace length which will be added to any message buffers
802 allocated. It can be set at any time, and if rmr_set_trace() is given a
803 trace len that is different than the default allcoated in a message, the message
806 Returns 0 on failure and 1 on success. If failure, then errno will be set.
808 extern int rmr_init_trace( void* vctx, int tr_len ) {
812 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
817 ctx->trace_data_len = tr_len;
822 Return true if routing table is initialised etc. and app can send/receive.
824 extern int rmr_ready( void* vctx ) {
827 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
831 if( ctx->rtable != NULL ) {
839 Returns a file descriptor which can be used with epoll() to signal a receive
840 pending. The file descriptor should NOT be read from directly, nor closed, as NNG
841 does not support this.
843 extern int rmr_get_rcvfd( void* vctx ) {
848 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
852 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
853 rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
864 There isn't an nng_flush() per se, but we can pause, generate
865 a context switch, which should allow the last sent buffer to
866 flow. There isn't exactly an nng_term/close either, so there
867 isn't much we can do.
869 extern void rmr_close( void* vctx ) {
872 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
877 nng_close( ctx->nn_sock );
881 // ----- multi-threaded call/receive support -------------------------------------------------
884 Blocks on the receive ring chute semaphore and then reads from the ring
885 when it is tickled. If max_wait is -1 then the function blocks until
886 a message is ready on the ring. Else max_wait is assumed to be the number
887 of millaseconds to wait before returning a timeout message.
889 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
891 uta_mhdr_t* hdr; // header in the transport buffer
893 struct timespec ts; // time info if we have a timeout
894 long new_ms; // adjusted mu-sec
895 long seconds = 0; // max wait seconds
896 long nano_sec; // max wait xlated to nano seconds
898 rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
900 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
903 mbuf->state = RMR_ERR_BADARG;
904 mbuf->tp_state = errno;
909 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
912 mbuf->state = RMR_ERR_NOTSUPP;
913 mbuf->tp_state = errno;
920 ombuf->state = RMR_ERR_TIMEOUT; // preset if for failure
924 chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
926 if( max_wait >= 0 ) {
927 clock_gettime( CLOCK_REALTIME, &ts );
929 if( max_wait > 999 ) {
930 seconds = max_wait / 1000;
931 max_wait -= seconds * 1000;
932 ts.tv_sec += seconds;
935 nano_sec = max_wait * 1000000;
936 ts.tv_nsec += nano_sec;
937 if( ts.tv_nsec > 999999999 ) {
938 ts.tv_nsec -= 999999999;
943 seconds = 1; // use as flag later to invoked timed wait
948 while( state < 0 && errno == EINTR ) {
950 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
952 state = sem_wait( &chute->barrier );
957 mbuf = ombuf; // return caller's buffer if they passed one in
959 errno = 0; // interrupted call state could be left; clear
960 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" );
961 if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
962 mbuf->state = RMR_OK;
965 rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
969 mbuf = ombuf; // no buffer, return user's if there
974 mbuf->tp_state = errno;
980 Accept a message buffer and caller ID, send the message and then wait
981 for the receiver to tickle the semaphore letting us know that a message
982 has been received. The call_id is a value between 2 and 255, inclusive; if
983 it's not in this range an error will be returned. Max wait is the amount
984 of time in millaseconds that the call should block for. If 0 is given
985 then no timeout is set.
987 If the mt_call feature has not been initialised, then the attempt to use this
988 funciton will fail with RMR_ERR_NOTSUPP
990 If no matching message is received before the max_wait period expires, a
991 nil pointer is returned, and errno is set to ETIMEOUT. If any other error
992 occurs after the message has been sent, then a nil pointer is returned
993 with errno set to some other value.
995 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
996 rmr_mbuf_t* ombuf; // original mbuf passed in
998 uta_mhdr_t* hdr; // header in the transport buffer
1000 unsigned char* d1; // d1 data in header
1001 struct timespec ts; // time info if we have a timeout
1002 long new_ms; // adjusted mu-sec
1003 long seconds = 0; // max wait seconds
1004 long nano_sec; // max wait xlated to nano seconds
1008 if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
1010 mbuf->tp_state = errno;
1011 mbuf->state = RMR_ERR_BADARG;
1016 if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
1017 mbuf->state = RMR_ERR_NOTSUPP;
1018 mbuf->tp_state = errno;
1022 if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
1023 mbuf->state = RMR_ERR_BADARG;
1024 mbuf->tp_state = errno;
1028 ombuf = mbuf; // save to return timeout status with
1030 chute = &ctx->chutes[call_id];
1031 if( chute->mbuf != NULL ) { // probably a delayed message that wasn't dropped
1032 rmr_free_msg( chute->mbuf );
1036 hdr = (uta_mhdr_t *) mbuf->header;
1037 hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
1038 memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
1039 d1 = DATA1_ADDR( hdr );
1040 d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
1041 mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
1043 if( max_wait >= 0 ) {
1044 clock_gettime( CLOCK_REALTIME, &ts );
1046 if( max_wait > 999 ) {
1047 seconds = max_wait / 1000;
1048 max_wait -= seconds * 1000;
1049 ts.tv_sec += seconds;
1051 if( max_wait > 0 ) {
1052 nano_sec = max_wait * 1000000;
1053 ts.tv_nsec += nano_sec;
1054 if( ts.tv_nsec > 999999999 ) {
1055 ts.tv_nsec -= 999999999;
1060 seconds = 1; // use as flag later to invoked timed wait
1063 mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
1065 if( mbuf->state != RMR_OK ) {
1066 mbuf->tp_state = errno;
1067 return mbuf; // timeout or unable to connect or no endpoint are most likely issues
1073 while( chute->mbuf == NULL && ! errno ) {
1075 state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
1077 state = sem_wait( &chute->barrier );
1080 if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
1084 if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
1085 if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
1086 rmr_free_msg( chute->mbuf );
1094 return NULL; // leave errno as set by sem wait call
1098 mbuf->state = RMR_OK;
1105 Given an existing message buffer, reallocate the payload portion to
1106 be at least new_len bytes. The message header will remain such that
1107 the caller may use the rmr_rts_msg() function to return a payload
1110 The mbuf passed in may or may not be reallocated and the caller must
1111 use the returned pointer and should NOT assume that it can use the
1112 pointer passed in with the exceptions based on the clone flag.
1114 If the clone flag is set, then a duplicated message, with larger payload
1115 size, is allocated and returned. The old_msg pointer in this situation is
1116 still valid and must be explicitly freed by the application. If the clone
1117 message is not set (0), then any memory management of the old message is
1118 handled by the function.
1120 If the copy flag is set, the contents of the old message's payload is
1121 copied to the reallocated payload. If the flag is not set, then the
1122 contents of the payload is undetermined.
1124 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
1125 if( old_msg == NULL ) {
1129 return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
1133 The following functions are "dummies" as NNG has no concept of supporting
1134 them, but are needed to resolve calls at link time.
1137 extern void rmr_set_fack( void* p ) {