3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: The bulk of the ric message routing library which is built upon
24 the older nanomsg messaging transport mehhanism.
26 To "hide" internal functions the choice was made to implement them
27 all as static functions. This means that we include nearly
28 all of our modules here as 90% of the library is not visible to
31 Author: E. Scott Daniels
32 Date: 28 November 2018
46 #include <arpa/inet.h>
48 #include <nanomsg/nn.h>
49 #include <nanomsg/tcp.h>
50 #include <nanomsg/pair.h>
51 #include <nanomsg/pipeline.h>
52 #include <nanomsg/pubsub.h>
54 #include "rmr.h" // things the users see
55 #include "rmr_agnostic.h" // headers agnostic to the underlying transport mechanism
56 #include "rmr_private.h" // things that we need too
57 #include "rmr_symtab.h"
59 #include "ring_static.c" // message ring support
60 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
61 #include "rtable_static.c" // route table things (nano specific)
62 #include "rtc_static.c" // common rt collector
63 #include "tools_static.c"
64 #include "sr_static.c" // send/receive static functions
65 #include "wormholes.c" // external wormhole api, and it's static functions (must be LAST)
67 // ------------------------------------------------------------------------------------------------------
72 static void free_ctx( uta_ctx_t* ctx ) {
75 free( ctx->rtg_addr );
80 // --------------- public functions --------------------------------------------------------------------------
83 Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
84 else we assume seconds. Setting -1 is always block.
85 Returns the nn value (0 on success <0 on error).
87 extern int rmr_set_rtimeout( void* vctx, int time ) {
90 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
97 time = time * 1000; // assume seconds, nn wants ms
101 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
105 Deprecated -- use rmr_set_rtimeout()
107 extern int rmr_rcv_to( void* vctx, int time ) {
108 return rmr_rcv_to( vctx, time );
113 Set the send timeout to time. If time >1000 we assume the time is milliseconds,
114 else we assume seconds. Setting -1 is always block.
115 Returns the nn value (0 on success <0 on error).
117 extern int rmr_set_stimeout( void* vctx, int time ) {
120 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
127 time = time * 1000; // assume seconds, nn wants ms
131 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
135 Deprecated -- use rmr_set_stimeout()
137 extern int rmr_send_to( void* vctx, int time ) {
138 return rmr_send_to( vctx, time );
142 Returns the size of the payload (bytes) that the msg buffer references.
143 Len in a message is the number of bytes which were received, or should
144 be transmitted, however, it is possible that the mbuf was allocated
145 with a larger payload space than the payload length indicates; this
146 function returns the absolute maximum space that the user has available
147 in the payload. On error (bad msg buffer) -1 is returned and errno should
150 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
151 if( msg == NULL || msg->header == NULL ) {
157 return msg->alloc_len - RMR_HDR_LEN( msg->header ); // transport buffer less header and other data bits
161 Allocates a send message as a zerocopy message allowing the underlying message protocol
162 to send the buffer without copy.
164 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
168 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
172 m = alloc_zcmsg( ctx, NULL, size, 0 );
177 Return the message to the available pool, or free it outright.
179 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
185 if( mbuf->flags & MFL_ZEROCOPY ) {
186 nn_freemsg( mbuf->header ); // must let nano free it
188 free( mbuf->header );
196 Accept a message and send it to an endpoint based on message type.
197 Allocates a new message buffer for the next send. If a message type has
198 more than one group of endpoints defined, then the message will be sent
199 in round robin fashion to one endpoint in each group.
201 CAUTION: this is a non-blocking send. If the message cannot be sent, then
202 it will return with an error and errno set to eagain. If the send is
203 a limited fanout, then the returned status is the status of the last
206 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
207 int nn_sock; // endpoint socket for send
209 int group; // selected group to get socket for
210 int send_again; // true if the message must be sent again
211 rmr_mbuf_t* clone_m; // cloned message for an nth send
213 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
214 errno = EINVAL; // if msg is null, this is their clue
216 msg->state = RMR_ERR_BADARG;
217 errno = EINVAL; // must ensure it's not eagain
222 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
223 if( msg->header == NULL ) {
224 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
225 msg->state = RMR_ERR_NOHDR;
226 errno = EBADMSG; // must ensure it's not eagain
230 send_again = 1; // force loop entry
231 group = 0; // always start with group 0
233 while( send_again ) {
234 nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups
235 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
236 msg->mtype, send_again, group, nn_sock, msg->len );
240 msg->state = RMR_ERR_NOENDPT;
241 errno = ENXIO; // must ensure it's not eagain
242 return msg; // caller can resend (maybe) or free
246 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
247 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
248 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
249 msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success
252 // error do we need to count successes/errors, how to report some success, esp if last fails?
256 msg = clone_m; // clone will be the next to send
258 msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was
262 return msg; // last message caries the status of last/only send attempt
266 Return to sender allows a message to be sent back to the endpoint where it originated.
267 The source information in the message is used to select the socket on which to write
268 the message rather than using the message type and round-robin selection. This
269 should return a message buffer with the state of the send operation set. On success
270 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
271 error it can be passed back to this function to retry the send if desired. On error,
272 errno will liklely have the failure reason set by the nanomsg send processing.
273 The following are possible values for the state in the message buffer:
275 Message states returned:
276 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
277 RMR_ERR_NOHDR - message did not have a header
278 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
279 RMR_ERR_SENDFAILED - send failed; errno has nano error code
280 RMR_ERR_RETRY - operation failed, but caller should retry
282 A nil message as the return value is rare, and generally indicates some kind of horrible
283 failure. The value of errno might give a clue as to what is wrong.
286 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
287 The caller must check for this and handle.
289 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
290 int nn_sock; // endpoint socket for send
294 char* hold_src; // we need the original source if send fails
296 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
297 errno = EINVAL; // if msg is null, this is their clue
299 msg->state = RMR_ERR_BADARG;
304 errno = 0; // at this point any bad state is in msg returned
305 if( msg->header == NULL ) {
306 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
307 msg->state = RMR_ERR_NOHDR;
311 nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint
313 msg->state = RMR_ERR_NOENDPT;
314 return msg; // preallocated msg can be reused since not given back to nn
317 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
318 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
319 msg = send_msg( ctx, msg, nn_sock );
321 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
322 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
330 Call sends the message based on message routing using the message type, and waits for a
331 response message to arrive with the same transaction id that was in the outgoing message.
332 If, while wiating for the expected response, messages are received which do not have the
333 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
334 order that they were received.
336 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
337 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
338 may be resent (likely the context pointer was nil). If the message is sent, but no
339 response is received, a nil message is returned with errno set to indicate the likley
341 ETIMEDOUT -- too many messages were queued before reciving the expected response
342 ENOBUFS -- the queued message ring is full, messages were dropped
343 EINVAL -- A parameter was not valid
344 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
345 user should call this function with the message again.
348 QUESTION: should user specify the number of messages to allow to queue?
350 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
352 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
354 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
356 msg->state = RMR_ERR_BADARG;
361 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
362 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
363 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
365 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
367 msg = rmr_send_msg( ctx, msg );
368 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
369 if( msg->state != RMR_ERR_RETRY ) {
370 msg->state = RMR_ERR_CALLFAILED; // don't stomp if send_msg set retry
375 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
379 The outward facing receive function. When invoked it will pop the oldest message
380 from the receive ring, if any are queued, and return it. If the ring is empty
381 then the receive function is invoked to wait for the next message to arrive (blocking).
383 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
384 nil, a new one will be allocated. However, the caller should NOT expect to get the same
385 struct back (if a queued message is returned the message struct will be different).
387 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
389 rmr_mbuf_t* qm; // message that was queued on the ring
391 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
392 if( old_msg != NULL ) {
393 old_msg->state = RMR_ERR_BADARG;
400 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
403 rmr_free_msg( old_msg ); // future: push onto a free list???
409 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
413 Receive with a timeout. This is a convenience function when sitting on top of
414 nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
416 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
417 rmr_set_rtimeout( vctx, ms_to );
418 return rmr_rcv_msg( vctx, old_msg );
423 This blocks until the message with the 'expect' ID is received. Messages which are received
424 before the expected message are queued onto the message ring. The function will return
425 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
426 expected message is received. If the queued message ring fills a nil pointer is returned
427 and errno is set to ENOBUFS.
429 Generally this will be invoked only by the call() function as it waits for a response, but
430 it is exposed to the user application as three is no reason not to.
432 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
434 int queued = 0; // number we pushed into the ring
435 int exp_len = 0; // length of expected ID
437 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
439 msg->state = RMR_ERR_BADARG;
447 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
448 return rmr_rcv_msg( ctx, msg );
451 exp_len = strlen( expect );
452 if( exp_len > RMR_MAX_XID ) {
453 exp_len = RMR_MAX_XID;
455 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
457 while( queued < allow2queue ) {
458 msg = rcv_msg( ctx, msg ); // hard wait for next
459 if( msg->state == RMR_OK ) {
460 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
461 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
465 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
466 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
471 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
477 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
484 Initialise the message routing environment. Flags are one of the UTAFL_
485 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
486 (tcp) to be used, then :port is all that is needed.
488 At the moment it seems that TCP really is the only viable protocol, but
489 we'll allow flexibility.
491 The return value is a void pointer which must be passed to most uta functions. On
492 error, a nil pointer is returned and errno should be set.
494 static void* init( char* uproto_port, int max_msg_size, int flags ) {
495 uta_ctx_t* ctx = NULL;
496 char bind_info[NN_SOCKADDR_MAX]; // bind info
497 char* proto = "tcp"; // pointer into the proto/port string user supplied
500 char wbuf[1024]; // work buffer
501 char* tok; // pointer at token in a buffer
503 char* interface = NULL; // interface to bind to pulled from RMR_BIND_IF if set
505 fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
506 QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
509 if( uproto_port == NULL ) {
510 proto_port = strdup( "tcp:4567" );
512 proto_port = strdup( uproto_port ); // so we can modify it
515 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
519 memset( ctx, 0, sizeof( uta_ctx_t ) );
522 ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
524 ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
525 if( max_msg_size > 0 ) {
526 if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
527 ctx->max_plen = max_msg_size;
529 fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
533 ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
535 uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
537 ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect
538 if( ctx->nn_sock < 0 ) {
539 fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
544 if( (port = strchr( proto_port, ':' )) != NULL ) {
545 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
548 *(port++) = 0; // term proto string and point at port string
549 proto = proto_port; // user supplied proto so point at it rather than default
552 port = proto_port; // assume something like "1234" was passed
555 if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
556 fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
559 if( (tok = strchr( wbuf, '.' )) != NULL ) {
560 *tok = 0; // we don't keep domain portion
562 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
563 if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port
564 fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
568 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
569 interface = "0.0.0.0";
571 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
572 if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions
573 fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
574 nn_close( ctx->nn_sock );
579 if( ! (flags & FL_NOTHREAD) ) { // skip if internal context that does not need rout table thread
580 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
581 fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
591 Publicly facing initialisation function. Wrapper for the init() funcion above
592 as it needs to ensure internal flags are masked off before calling the
595 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
596 return init( uproto_port, max_msg_size, flags & UFL_MASK );
600 Return true if routing table is initialised etc. and app can send/receive.
602 extern int rmr_ready( void* vctx ) {
605 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
609 if( ctx->rtable != NULL ) {
617 Provides a non-fatal (compile) interface for the nng only function.
618 Not supported on top of nano, so this always returns -1.
620 extern int rmr_get_rcvfd( void* vctx ) {
626 Compatability (mostly) with NNG.
628 extern void rmr_close( void* vctx ) {
631 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
635 nn_close( ctx->nn_sock );