3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: The bulk of the ric message routing library which is built upon
24 the older nanomsg messaging transport mehhanism.
26 To "hide" internal functions the choice was made to implement them
27 all as static functions. This means that we include nearly
28 all of our modules here as 90% of the library is not visible to
31 Author: E. Scott Daniels
32 Date: 28 November 2018
46 #include <arpa/inet.h>
48 #include <nanomsg/nn.h>
49 #include <nanomsg/tcp.h>
50 #include <nanomsg/pair.h>
51 #include <nanomsg/pipeline.h>
52 #include <nanomsg/pubsub.h>
54 #include "rmr.h" // things the users see
55 #include "rmr_agnostic.h" // headers agnostic to the underlying transport mechanism
56 #include "rmr_private.h" // things that we need too
57 #include "rmr_symtab.h"
59 #include "ring_static.c" // message ring support
60 #include "rt_generic_static.c" // generic route table (not nng/nano specific)
61 #include "rtable_static.c" // route table things (nano specific)
62 #include "rtc_static.c" // common rt collector
63 #include "tools_static.c"
64 #include "sr_static.c" // send/receive static functions
65 #include "wormholes.c" // external wormhole api, and it's static functions (must be LAST)
67 // ------------------------------------------------------------------------------------------------------
72 static void free_ctx( uta_ctx_t* ctx ) {
75 free( ctx->rtg_addr );
80 // --------------- public functions --------------------------------------------------------------------------
83 Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
84 receive and -1 is block for ever.
85 Returns the nn value (0 on success <0 on error).
87 extern int rmr_set_rtimeout( void* vctx, int time ) {
90 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
95 if( ctx->last_rto == time ) {
101 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
105 Deprecated -- use rmr_set_rtimeout()
107 extern int rmr_rcv_to( void* vctx, int time ) {
108 return rmr_rcv_to( vctx, time );
112 Set the send timeout to time. If time >1000 we assume the time is milliseconds,
113 else we assume seconds. Setting -1 is always block.
114 Returns the nn value (0 on success <0 on error).
116 extern int rmr_set_stimeout( void* vctx, int time ) {
119 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
126 time = time * 1000; // assume seconds, nn wants ms
130 return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
134 Deprecated -- use rmr_set_stimeout()
136 extern int rmr_send_to( void* vctx, int time ) {
137 return rmr_send_to( vctx, time );
141 Returns the size of the payload (bytes) that the msg buffer references.
142 Len in a message is the number of bytes which were received, or should
143 be transmitted, however, it is possible that the mbuf was allocated
144 with a larger payload space than the payload length indicates; this
145 function returns the absolute maximum space that the user has available
146 in the payload. On error (bad msg buffer) -1 is returned and errno should
149 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
150 if( msg == NULL || msg->header == NULL ) {
156 return msg->alloc_len - RMR_HDR_LEN( msg->header ); // transport buffer less header and other data bits
160 Allocates a send message as a zerocopy message allowing the underlying message protocol
161 to send the buffer without copy.
163 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
167 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
171 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
176 Allocates a send message as a zerocopy message allowing the underlying message protocol
177 to send the buffer without copy. In addition, a trace data field of tr_size will be
178 added and the supplied data coppied to the buffer before returning the message to
181 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
186 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
190 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
192 state = rmr_set_trace( m, data, tr_size ); // roll their data in
193 if( state != tr_size ) {
194 m->state = RMR_ERR_INITFAILED;
202 Need an external path to the realloc static function as it's called by an
203 outward facing mbuf api function.
205 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
206 return realloc_msg( msg, new_tr_size );
210 Return the message to the available pool, or free it outright.
212 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
218 if( mbuf->flags & MFL_ZEROCOPY ) {
219 nn_freemsg( mbuf->header ); // must let nano free it
221 free( mbuf->header );
229 Accept a message and send it to an endpoint based on message type.
230 Allocates a new message buffer for the next send. If a message type has
231 more than one group of endpoints defined, then the message will be sent
232 in round robin fashion to one endpoint in each group.
234 CAUTION: this is a non-blocking send. If the message cannot be sent, then
235 it will return with an error and errno set to eagain. If the send is
236 a limited fanout, then the returned status is the status of the last
239 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
240 int nn_sock; // endpoint socket for send
242 int group; // selected group to get socket for
243 int send_again; // true if the message must be sent again
244 rmr_mbuf_t* clone_m; // cloned message for an nth send
245 uint64_t key; // lookup key is now subid and mtype
247 int altk_ok = 0; // ok to retry with alt key when true
249 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
250 errno = EINVAL; // if msg is null, this is their clue
252 msg->state = RMR_ERR_BADARG;
253 errno = EINVAL; // must ensure it's not eagain
258 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
259 if( msg->header == NULL ) {
260 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
261 msg->state = RMR_ERR_NOHDR;
262 errno = EBADMSG; // must ensure it's not eagain
266 send_again = 1; // force loop entry
267 group = 0; // always start with group 0
269 key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
270 if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used
274 while( send_again ) {
276 nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
277 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
278 msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
281 if( altk_ok ) { // ok to retry with alternate key
282 key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry
288 msg->state = RMR_ERR_NOENDPT;
289 errno = ENXIO; // must ensure it's not eagain
290 return msg; // caller can resend (maybe) or free
295 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
296 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
297 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
298 msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success
299 while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
300 msg = send_msg( ctx, msg, nn_sock );
304 msg = clone_m; // clone will be the next to send
306 msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was
307 while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
308 msg = send_msg( ctx, msg, nn_sock );
314 return msg; // last message caries the status of last/only send attempt
318 Return to sender allows a message to be sent back to the endpoint where it originated.
319 The source information in the message is used to select the socket on which to write
320 the message rather than using the message type and round-robin selection. This
321 should return a message buffer with the state of the send operation set. On success
322 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
323 error it can be passed back to this function to retry the send if desired. On error,
324 errno will liklely have the failure reason set by the nanomsg send processing.
325 The following are possible values for the state in the message buffer:
327 Message states returned:
328 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
329 RMR_ERR_NOHDR - message did not have a header
330 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
331 RMR_ERR_SENDFAILED - send failed; errno has nano error code
332 RMR_ERR_RETRY - operation failed, but caller should retry
334 A nil message as the return value is rare, and generally indicates some kind of horrible
335 failure. The value of errno might give a clue as to what is wrong.
338 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
339 The caller must check for this and handle.
341 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
342 int nn_sock; // endpoint socket for send
346 char* hold_src; // we need the original source if send fails
348 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
349 errno = EINVAL; // if msg is null, this is their clue
351 msg->state = RMR_ERR_BADARG;
356 errno = 0; // at this point any bad state is in msg returned
357 if( msg->header == NULL ) {
358 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
359 msg->state = RMR_ERR_NOHDR;
363 nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint
365 msg->state = RMR_ERR_NOENDPT;
366 return msg; // preallocated msg can be reused since not given back to nn
369 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
370 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
371 msg = send_msg( ctx, msg, nn_sock );
373 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
374 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
382 Call sends the message based on message routing using the message type, and waits for a
383 response message to arrive with the same transaction id that was in the outgoing message.
384 If, while wiating for the expected response, messages are received which do not have the
385 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
386 order that they were received.
388 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
389 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
390 may be resent (likely the context pointer was nil). If the message is sent, but no
391 response is received, a nil message is returned with errno set to indicate the likley
393 ETIMEDOUT -- too many messages were queued before reciving the expected response
394 ENOBUFS -- the queued message ring is full, messages were dropped
395 EINVAL -- A parameter was not valid
396 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
397 user should call this function with the message again.
400 QUESTION: should user specify the number of messages to allow to queue?
402 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
404 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
406 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
408 msg->state = RMR_ERR_BADARG;
413 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
414 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
415 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
417 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
419 msg = rmr_send_msg( ctx, msg );
420 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
421 if( msg->state != RMR_ERR_RETRY ) {
422 msg->state = RMR_ERR_CALLFAILED; // don't stomp if send_msg set retry
427 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
431 The outward facing receive function. When invoked it will pop the oldest message
432 from the receive ring, if any are queued, and return it. If the ring is empty
433 then the receive function is invoked to wait for the next message to arrive (blocking).
435 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
436 nil, a new one will be allocated. However, the caller should NOT expect to get the same
437 struct back (if a queued message is returned the message struct will be different).
439 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
441 rmr_mbuf_t* qm; // message that was queued on the ring
443 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
444 if( old_msg != NULL ) {
445 old_msg->state = RMR_ERR_BADARG;
452 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
455 rmr_free_msg( old_msg ); // future: push onto a free list???
461 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
465 Receive with a timeout. This is a convenience function when sitting on top of
466 nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
468 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
471 if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
472 if( ctx->last_rto != ms_to ) { // avoid call overhead
473 rmr_set_rtimeout( vctx, ms_to );
477 return rmr_rcv_msg( vctx, old_msg );
482 This blocks until the message with the 'expect' ID is received. Messages which are received
483 before the expected message are queued onto the message ring. The function will return
484 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
485 expected message is received. If the queued message ring fills a nil pointer is returned
486 and errno is set to ENOBUFS.
488 Generally this will be invoked only by the call() function as it waits for a response, but
489 it is exposed to the user application as three is no reason not to.
491 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
493 int queued = 0; // number we pushed into the ring
494 int exp_len = 0; // length of expected ID
496 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
498 msg->state = RMR_ERR_BADARG;
506 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
507 return rmr_rcv_msg( ctx, msg );
510 exp_len = strlen( expect );
511 if( exp_len > RMR_MAX_XID ) {
512 exp_len = RMR_MAX_XID;
514 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
516 while( queued < allow2queue ) {
517 msg = rcv_msg( ctx, msg ); // hard wait for next
518 if( msg->state == RMR_OK ) {
519 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
520 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
524 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
525 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
530 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
536 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
543 Initialise the message routing environment. Flags are one of the UTAFL_
544 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
545 (tcp) to be used, then :port is all that is needed.
547 At the moment it seems that TCP really is the only viable protocol, but
548 we'll allow flexibility.
550 The return value is a void pointer which must be passed to most uta functions. On
551 error, a nil pointer is returned and errno should be set.
553 static void* init( char* uproto_port, int max_msg_size, int flags ) {
554 uta_ctx_t* ctx = NULL;
555 char bind_info[NN_SOCKADDR_MAX]; // bind info
556 char* proto = "tcp"; // pointer into the proto/port string user supplied
559 char wbuf[1024]; // work buffer
560 char* tok; // pointer at token in a buffer
562 char* interface = NULL; // interface to bind to pulled from RMR_BIND_IF if set
564 fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
565 QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
568 if( uproto_port == NULL ) {
569 proto_port = strdup( "tcp:4567" );
571 proto_port = strdup( uproto_port ); // so we can modify it
574 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
578 memset( ctx, 0, sizeof( uta_ctx_t ) );
581 ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
582 ctx->last_rto = -2; // last receive timeout that was set; invalid value to force first to set
584 ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
585 if( max_msg_size > 0 ) {
586 if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
587 ctx->max_plen = max_msg_size;
589 fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
593 ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
595 uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
597 ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect
598 if( ctx->nn_sock < 0 ) {
599 fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
604 if( (port = strchr( proto_port, ':' )) != NULL ) {
605 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
608 *(port++) = 0; // term proto string and point at port string
609 proto = proto_port; // user supplied proto so point at it rather than default
612 port = proto_port; // assume something like "1234" was passed
615 if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
616 fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
619 if( (tok = strchr( wbuf, '.' )) != NULL ) {
620 *tok = 0; // we don't keep domain portion
622 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
623 if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port
624 fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
628 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
629 interface = "0.0.0.0";
631 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
632 if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions
633 fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
634 nn_close( ctx->nn_sock );
639 if( ! (flags & FL_NOTHREAD) ) { // skip if internal context that does not need rout table thread
640 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
641 fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
650 This sets the default trace length which will be added to any message buffers
651 allocated. It can be set at any time, and if rmr_set_trace() is given a
652 trace len that is different than the default allcoated in a message, the message
655 Returns 0 on failure and 1 on success. If failure, then errno will be set.
657 extern int rmr_init_trace( void* vctx, int tr_len ) {
661 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
666 ctx->trace_data_len = tr_len;
671 Publicly facing initialisation function. Wrapper for the init() funcion above
672 as it needs to ensure internal flags are masked off before calling the
675 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
676 return init( uproto_port, max_msg_size, flags & UFL_MASK );
680 Return true if routing table is initialised etc. and app can send/receive.
682 extern int rmr_ready( void* vctx ) {
685 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
689 if( ctx->rtable != NULL ) {
697 Provides a non-fatal (compile) interface for the nng only function.
698 Not supported on top of nano, so this always returns -1.
700 extern int rmr_get_rcvfd( void* vctx ) {
706 Compatability (mostly) with NNG.
708 extern void rmr_close( void* vctx ) {
711 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
715 nn_close( ctx->nn_sock );