1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the nng version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
55 #include <nng/protocol/pubsub0/pub.h>
56 #include <nng/protocol/pubsub0/sub.h>
57 #include <nng/protocol/pipeline0/push.h>
58 #include <nng/protocol/pipeline0/pull.h>
61 #include "rmr.h" // things the users see
62 #include "rmr_agnostic.h" // agnostic things (must be included before private)
63 #include "rmr_nng_private.h" // things that we need too
64 #include "rmr_symtab.h"
66 #include "ring_static.c" // message ring support
67 #include "rt_generic_static.c" // route table things not transport specific
68 #include "rtable_nng_static.c" // route table things -- transport specific
69 #include "rtc_static.c" // route table collector
70 #include "tools_static.c"
71 #include "sr_nng_static.c" // send/receive static functions
72 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
75 //------------------------------------------------------------------------------
81 static void free_ctx( uta_ctx_t* ctx ) {
84 free( ctx->rtg_addr );
89 // --------------- public functions --------------------------------------------------------------------------
92 Returns the size of the payload (bytes) that the msg buffer references.
93 Len in a message is the number of bytes which were received, or should
94 be transmitted, however, it is possible that the mbuf was allocated
95 with a larger payload space than the payload length indicates; this
96 function returns the absolute maximum space that the user has available
97 in the payload. On error (bad msg buffer) -1 is returned and errno should
100 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
101 if( msg == NULL || msg->header == NULL ) {
107 return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
111 Allocates a send message as a zerocopy message allowing the underlying message protocol
112 to send the buffer without copy.
114 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
118 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
122 m = alloc_zcmsg( ctx, NULL, size, 0 );
127 Return the message to the available pool, or free it outright.
129 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
135 if( mbuf->flags & MFL_ZEROCOPY ) {
136 //nng_free( (void *) mbuf->header, mbuf->alloc_len );
138 nng_msg_free( mbuf->tp_buf );
147 send message with maximum timeout.
148 Accept a message and send it to an endpoint based on message type.
149 If NNG reports that the send attempt timed out, or should be retried,
150 RMr will retry for approximately max_to microseconds; rounded to the next
153 Allocates a new message buffer for the next send. If a message type has
154 more than one group of endpoints defined, then the message will be sent
155 in round robin fashion to one endpoint in each group.
157 CAUTION: this is a non-blocking send. If the message cannot be sent, then
158 it will return with an error and errno set to eagain. If the send is
159 a limited fanout, then the returned status is the status of the last
163 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
164 nng_socket nn_sock; // endpoint socket for send
166 int group; // selected group to get socket for
167 int send_again; // true if the message must be sent again
168 rmr_mbuf_t* clone_m; // cloned message for an nth send
169 int sock_ok; // got a valid socket from round robin select
171 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
172 errno = EINVAL; // if msg is null, this is their clue
174 msg->state = RMR_ERR_BADARG;
175 errno = EINVAL; // must ensure it's not eagain
180 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
181 if( msg->header == NULL ) {
182 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
183 msg->state = RMR_ERR_NOHDR;
184 errno = EBADMSG; // must ensure it's not eagain
189 max_to = ctx->send_retries; // convert to retries
192 send_again = 1; // force loop entry
193 group = 0; // always start with group 0
195 while( send_again ) {
196 sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
197 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
198 msg->mtype, send_again, group, msg->len, sock_ok );
202 msg->state = RMR_ERR_NOENDPT;
203 errno = ENXIO; // must ensure it's not eagain
204 return msg; // caller can resend (maybe) or free
208 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
209 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
210 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
211 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
214 // error do we need to count successes/errors, how to report some success, esp if last fails?
218 msg = clone_m; // clone will be the next to send
220 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
224 return msg; // last message caries the status of last/only send attempt
228 Send with default max timeout as is set in the context.
229 See rmr_mtosend_msg() for more details on the parameters.
230 See rmr_stimeout() for info on setting the default timeout.
232 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
233 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < uses default from ctx
237 Return to sender allows a message to be sent back to the endpoint where it originated.
238 The source information in the message is used to select the socket on which to write
239 the message rather than using the message type and round-robin selection. This
240 should return a message buffer with the state of the send operation set. On success
241 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
242 error it can be passed back to this function to retry the send if desired. On error,
243 errno will liklely have the failure reason set by the nng send processing.
244 The following are possible values for the state in the message buffer:
246 Message states returned:
247 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
248 RMR_ERR_NOHDR - message did not have a header
249 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
250 RMR_ERR_SENDFAILED - send failed; errno has nano error code
251 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
253 A nil message as the return value is rare, and generally indicates some kind of horrible
254 failure. The value of errno might give a clue as to what is wrong.
257 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
258 The caller must check for this and handle.
260 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
261 nng_socket nn_sock; // endpoint socket for send
265 char* hold_src; // we need the original source if send fails
266 int sock_ok; // true if we found a valid endpoint socket
268 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
269 errno = EINVAL; // if msg is null, this is their clue
271 msg->state = RMR_ERR_BADARG;
276 errno = 0; // at this point any bad state is in msg returned
277 if( msg->header == NULL ) {
278 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
279 msg->state = RMR_ERR_NOHDR;
283 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // socket of specific endpoint
285 msg->state = RMR_ERR_NOENDPT;
286 return msg; // preallocated msg can be reused since not given back to nn
289 msg->state = RMR_OK; // ensure it is clear before send
290 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
291 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
292 msg = send_msg( ctx, msg, nn_sock, -1 );
294 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
295 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
303 Call sends the message based on message routing using the message type, and waits for a
304 response message to arrive with the same transaction id that was in the outgoing message.
305 If, while wiating for the expected response, messages are received which do not have the
306 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
307 order that they were received.
309 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
310 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
311 may be resent (likely the context pointer was nil). If the message is sent, but no
312 response is received, a nil message is returned with errno set to indicate the likley
314 ETIMEDOUT -- too many messages were queued before reciving the expected response
315 ENOBUFS -- the queued message ring is full, messages were dropped
316 EINVAL -- A parameter was not valid
317 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
318 user should call this function with the message again.
321 QUESTION: should user specify the number of messages to allow to queue?
323 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
325 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
327 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
329 msg->state = RMR_ERR_BADARG;
334 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
335 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
336 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
338 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
340 msg = rmr_send_msg( ctx, msg );
341 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
342 if( msg->state != RMR_ERR_RETRY ) {
343 msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry
348 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
352 The outward facing receive function. When invoked it will pop the oldest message
353 from the receive ring, if any are queued, and return it. If the ring is empty
354 then the receive function is invoked to wait for the next message to arrive (blocking).
356 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
357 nil, a new one will be allocated. However, the caller should NOT expect to get the same
358 struct back (if a queued message is returned the message struct will be different).
360 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
362 rmr_mbuf_t* qm; // message that was queued on the ring
364 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
365 if( old_msg != NULL ) {
366 old_msg->state = RMR_ERR_BADARG;
373 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
376 rmr_free_msg( old_msg ); // future: push onto a free list???
382 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
386 This implements a receive with a timeout via epoll. Mostly this is for
387 wrappers as native C applications can use epoll directly and will not have
390 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
391 struct epoll_stuff* eps; // convience pointer
393 rmr_mbuf_t* qm; // message that was queued on the ring
397 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
398 if( old_msg != NULL ) {
399 old_msg->state = RMR_ERR_BADARG;
405 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
408 rmr_free_msg( old_msg ); // future: push onto a free list???
414 if( (eps = ctx->eps) == NULL ) { // set up epoll on first call
415 eps = malloc( sizeof *eps );
417 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
418 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
423 eps->nng_fd = rmr_get_rcvfd( ctx );
424 eps->epe.events = EPOLLIN;
425 eps->epe.data.fd = eps->nng_fd;
427 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
428 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
439 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
446 nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout
447 if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours
448 msg->state = RMR_ERR_TIMEOUT;
450 return rcv_msg( ctx, msg ); // receive it and return it
453 return msg; // return empty message with state set
457 This blocks until the message with the 'expect' ID is received. Messages which are received
458 before the expected message are queued onto the message ring. The function will return
459 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
460 expected message is received. If the queued message ring fills a nil pointer is returned
461 and errno is set to ENOBUFS.
463 Generally this will be invoked only by the call() function as it waits for a response, but
464 it is exposed to the user application as three is no reason not to.
466 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
468 int queued = 0; // number we pushed into the ring
469 int exp_len = 0; // length of expected ID
471 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
473 msg->state = RMR_ERR_BADARG;
481 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
482 return rmr_rcv_msg( ctx, msg );
485 exp_len = strlen( expect );
486 if( exp_len > RMR_MAX_XID ) {
487 exp_len = RMR_MAX_XID;
489 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
491 while( queued < allow2queue ) {
492 msg = rcv_msg( ctx, msg ); // hard wait for next
493 if( msg->state == RMR_OK ) {
494 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
495 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
499 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
500 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
505 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
511 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
516 // CAUTION: these are not supported as they must be set differently (between create and open) in NNG.
517 // until those details are worked out, these generate a warning.
519 Set send timeout. The value time is assumed to be microseconds. The timeout is the
520 rough maximum amount of time that RMr will block on a send attempt when the underlying
521 mechnism indicates eagain or etimeedout. All other error conditions are reported
522 without this delay. Setting a timeout of 0 causes no retries to be attempted in
523 RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
524 but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
525 after every 10K send attempts until the time value is reached. Retries are abandoned
526 if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
528 The default, if this function is not used, is 1; meaning that RMr will retry, but will
529 not enter a sleep. In all cases the caller should check the status in the message returned
532 Returns -1 if the context was invalid; RMR_OK otherwise.
534 extern int rmr_set_stimeout( void* vctx, int time ) {
537 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
545 ctx->send_retries = time;
550 Set receive timeout -- not supported in nng implementation
552 extern int rmr_set_rtimeout( void* vctx, int time ) {
553 fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
559 This is the actual init workhorse. The user visible function meerly ensures that the
560 calling programme does NOT set any internal flags that are supported, and then
561 invokes this. Internal functions (the route table collector) which need additional
562 open ports without starting additional route table collectors, will invoke this
563 directly with the proper flag.
565 static void* init( char* uproto_port, int max_msg_size, int flags ) {
566 static int announced = 0;
567 uta_ctx_t* ctx = NULL;
568 char bind_info[NNG_MAXADDRLEN]; // bind info
569 char* proto = "tcp"; // pointer into the proto/port string user supplied
571 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
573 char wbuf[1024]; // work buffer
574 char* tok; // pointer at token in a buffer
578 fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n",
579 QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
584 if( uproto_port == NULL ) {
585 proto_port = strdup( DEF_COMM_PORT );
587 proto_port = strdup( uproto_port ); // so we can modify it
590 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
594 memset( ctx, 0, sizeof( uta_ctx_t ) );
596 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
597 ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
599 ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
600 if( max_msg_size > 0 ) {
601 if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
602 ctx->max_plen = max_msg_size;
604 fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
608 ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
610 // we're using a listener to get rtg updates, so we do NOT need this.
611 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
613 if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode
614 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
619 if( (port = strchr( proto_port, ':' )) != NULL ) {
620 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
623 *(port++) = 0; // term proto string and point at port string
624 proto = proto_port; // user supplied proto so point at it rather than default
627 port = proto_port; // assume something like "1234" was passed
630 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
631 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
634 if( (tok = strchr( wbuf, '.' )) != NULL ) {
635 *tok = 0; // we don't keep domain portion
637 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
638 if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port
639 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
643 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
647 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
648 interface = "0.0.0.0";
650 // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
651 // rather than using this generic listen() call.
652 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
653 if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
654 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
655 nng_close( ctx->nn_sock );
660 if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
661 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
662 fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
671 Initialise the message routing environment. Flags are one of the UTAFL_
672 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
673 (tcp) to be used, then :port is all that is needed.
675 At the moment it seems that TCP really is the only viable protocol, but
676 we'll allow flexibility.
678 The return value is a void pointer which must be passed to most uta functions. On
679 error, a nil pointer is returned and errno should be set.
682 No user flags supported (needed) at the moment, but this provides for extension
683 without drastically changing anything. The user should invoke with RMRFL_NONE to
684 avoid any misbehavour as there are internal flags which are suported
686 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
687 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
691 Return true if routing table is initialised etc. and app can send/receive.
693 extern int rmr_ready( void* vctx ) {
696 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
700 if( ctx->rtable != NULL ) {
708 Returns a file descriptor which can be used with epoll() to signal a receive
709 pending. The file descriptor should NOT be read from directly, nor closed, as NNG
710 does not support this.
712 extern int rmr_get_rcvfd( void* vctx ) {
717 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
721 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
722 fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
733 There isn't an nng_flush() per se, but we can pause, generate
734 a context switch, which should allow the last sent buffer to
735 flow. There isn't exactly an nng_term/close either, so there
736 isn't much we can do.
738 extern void rmr_close( void* vctx ) {
741 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
746 nng_close( ctx->nn_sock );