// : vi ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
+ Copyright (c) 2019 Nokia
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
/*
Mnemonic: rmr_nng.c
- Abstract: This is the compile point for the nng version of the rmr
+ Abstract: This is the compile point for the nng version of the rmr
library (formarly known as uta, so internal function names
are likely still uta_*)
- With the exception of the symtab portion of the library,
- RMr is built with a single compile so as to "hide" the
+ With the exception of the symtab portion of the library,
+ RMr is built with a single compile so as to "hide" the
internal functions as statics. Because they interdepend
on each other, and CMake has issues with generating two
different wormhole objects from a single source, we just
pull it all together with a centralised comple using
- includes.
+ includes.
Future: the API functions at this point can be separated
into a common source module.
Returns the size of the payload (bytes) that the msg buffer references.
Len in a message is the number of bytes which were received, or should
be transmitted, however, it is possible that the mbuf was allocated
- with a larger payload space than the payload length indicates; this
+ with a larger payload space than the payload length indicates; this
function returns the absolute maximum space that the user has available
in the payload. On error (bad msg buffer) -1 is returned and errno should
indicate the rason.
}
errno = 0;
- return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
+ return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits
}
/*
return NULL;
}
- m = alloc_zcmsg( ctx, NULL, size, 0 );
+ m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
return m;
}
+
+/*
+ Allocates a send message as a zerocopy message allowing the underlying message protocol
+ to send the buffer without copy. In addition, a trace data field of tr_size will be
+ added and the supplied data coppied to the buffer before returning the message to
+ the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+ uta_ctx_t* ctx;
+ rmr_mbuf_t* m;
+ int state;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return NULL;
+ }
+
+ m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
+ if( m != NULL ) {
+ state = rmr_set_trace( m, data, tr_size ); // roll their data in
+ if( state != tr_size ) {
+ m->state = RMR_ERR_INITFAILED;
+ }
+ }
+
+ return m;
+}
+
+/*
+ This provides an external path to the realloc static function as it's called by an
+ outward facing mbuf api function. Used to reallocate a message with a different
+ trace data size.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+ return realloc_msg( msg, new_tr_size );
+}
+
+
/*
Return the message to the available pool, or free it outright.
*/
}
}
}
-
+
free( mbuf );
}
/*
send message with maximum timeout.
- Accept a message and send it to an endpoint based on message type.
+ Accept a message and send it to an endpoint based on message type.
If NNG reports that the send attempt timed out, or should be retried,
RMr will retry for approximately max_to microseconds; rounded to the next
higher value of 10.
Allocates a new message buffer for the next send. If a message type has
more than one group of endpoints defined, then the message will be sent
- in round robin fashion to one endpoint in each group.
+ in round robin fashion to one endpoint in each group.
+
+ An endpoint will be looked up in the route table using the message type and
+ the subscription id. If the subscription id is "UNSET_SUBID", then only the
+ message type is used. If the initial lookup, with a subid, fails, then a
+ second lookup using just the mtype is tried.
CAUTION: this is a non-blocking send. If the message cannot be sent, then
it will return with an error and errno set to eagain. If the send is
a limited fanout, then the returned status is the status of the last
send attempt.
-
+
*/
extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
nng_socket nn_sock; // endpoint socket for send
int send_again; // true if the message must be sent again
rmr_mbuf_t* clone_m; // cloned message for an nth send
int sock_ok; // got a valid socket from round robin select
+ uint64_t key; // mtype or sub-id/mtype sym table key
+ int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
errno = EINVAL; // must ensure it's not eagain
- }
+ }
return msg;
}
send_again = 1; // force loop entry
group = 0; // always start with group 0
+ key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
+ if( msg->sub_id != UNSET_SUBID ) {
+ altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+ }
while( send_again ) {
- sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
- msg->mtype, send_again, group, msg->len, sock_ok );
- group++;
+ sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
if( ! sock_ok ) {
+ if( altk_ok ) { // we can try with the alternate (no sub-id) key
+ altk_ok = 0;
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
+ send_again = 1; // ensure we don't exit the while
+ continue;
+ }
+
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
+ group++;
+
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
/*
if( msg ) {
// error do we need to count successes/errors, how to report some success, esp if last fails?
- }
+ }
*/
msg = clone_m; // clone will be the next to send
}
/*
- Send with default max timeout as is set in the context.
+ Send with default max timeout as is set in the context.
See rmr_mtosend_msg() for more details on the parameters.
See rmr_stimeout() for info on setting the default timeout.
*/
}
/*
- Return to sender allows a message to be sent back to the endpoint where it originated.
+ Return to sender allows a message to be sent back to the endpoint where it originated.
The source information in the message is used to select the socket on which to write
- the message rather than using the message type and round-robin selection. This
+ the message rather than using the message type and round-robin selection. This
should return a message buffer with the state of the send operation set. On success
(state is RMR_OK, the caller may use the buffer for another receive operation), and on
error it can be passed back to this function to retry the send if desired. On error,
errno = EINVAL; // if msg is null, this is their clue
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
return msg;
}
Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
to ensure that no error was encountered. If the state is UTA_BADARG, then the message
- may be resent (likely the context pointer was nil). If the message is sent, but no
+ may be resent (likely the context pointer was nil). If the message is sent, but no
response is received, a nil message is returned with errno set to indicate the likley
issue:
ETIMEDOUT -- too many messages were queued before reciving the expected response
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
return msg;
}
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return old_msg;
}
}
/*
- This implements a receive with a timeout via epoll. Mostly this is for
- wrappers as native C applications can use epoll directly and will not have
+ This implements a receive with a timeout via epoll. Mostly this is for
+ wrappers as native C applications can use epoll directly and will not have
to depend on this.
*/
extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return old_msg;
}
}
if( (eps = ctx->eps) == NULL ) { // set up epoll on first call
- eps = malloc( sizeof *eps );
+ eps = malloc( sizeof *eps );
- if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
- fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
+ if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
free( eps );
return NULL;
- }
+ }
eps->nng_fd = rmr_get_rcvfd( ctx );
eps->epe.events = EPOLLIN;
- eps->epe.data.fd = eps->nng_fd;
+ eps->epe.data.fd = eps->nng_fd;
- if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
- fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+ if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
+ fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
free( eps );
return NULL;
- }
+ }
ctx->eps = eps;
}
if( old_msg ) {
msg = old_msg;
} else {
- msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
+ msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
}
if( ms_to < 0 ) {
/*
This blocks until the message with the 'expect' ID is received. Messages which are received
- before the expected message are queued onto the message ring. The function will return
+ before the expected message are queued onto the message ring. The function will return
a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
expected message is received. If the queued message ring fills a nil pointer is returned
and errno is set to ENOBUFS.
- Generally this will be invoked only by the call() function as it waits for a response, but
+ Generally this will be invoked only by the call() function as it waits for a response, but
it is exposed to the user application as three is no reason not to.
*/
extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
uta_ctx_t* ctx;
int queued = 0; // number we pushed into the ring
int exp_len = 0; // length of expected ID
-
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return msg;
}
// CAUTION: these are not supported as they must be set differently (between create and open) in NNG.
// until those details are worked out, these generate a warning.
/*
- Set send timeout. The value time is assumed to be microseconds. The timeout is the
+ Set send timeout. The value time is assumed to be microseconds. The timeout is the
rough maximum amount of time that RMr will block on a send attempt when the underlying
mechnism indicates eagain or etimeedout. All other error conditions are reported
without this delay. Setting a timeout of 0 causes no retries to be attempted in
RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
- but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
+ but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
after every 10K send attempts until the time value is reached. Retries are abandoned
if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
/*
This is the actual init workhorse. The user visible function meerly ensures that the
- calling programme does NOT set any internal flags that are supported, and then
+ calling programme does NOT set any internal flags that are supported, and then
invokes this. Internal functions (the route table collector) which need additional
- open ports without starting additional route table collectors, will invoke this
+ open ports without starting additional route table collectors, will invoke this
directly with the proper flag.
*/
static void* init( char* uproto_port, int max_msg_size, int flags ) {
char* port;
char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
char* proto_port;
- char wbuf[1024]; // work buffer
+ char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
int state;
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n",
- QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
+ RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
- ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
+ ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
- if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
- ctx->max_plen = max_msg_size;
- } else {
- fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
- }
+ ctx->max_plen = max_msg_size;
}
- ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
-
// we're using a listener to get rtg updates, so we do NOT need this.
//uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
Flags:
No user flags supported (needed) at the moment, but this provides for extension
- without drastically changing anything. The user should invoke with RMRFL_NONE to
+ without drastically changing anything. The user should invoke with RMRFL_NONE to
avoid any misbehavour as there are internal flags which are suported
*/
extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
}
+/*
+ This sets the default trace length which will be added to any message buffers
+ allocated. It can be set at any time, and if rmr_set_trace() is given a
+ trace len that is different than the default allcoated in a message, the message
+ will be resized.
+
+ Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+ uta_ctx_t* ctx;
+
+ errno = 0;
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return 0;
+ }
+
+ ctx->trace_data_len = tr_len;
+ return 1;
+}
+
/*
Return true if routing table is initialised etc. and app can send/receive.
*/
}
/*
- Returns a file descriptor which can be used with epoll() to signal a receive
- pending. The file descriptor should NOT be read from directly, nor closed, as NNG
- does not support this.
+ Returns a file descriptor which can be used with epoll() to signal a receive
+ pending. The file descriptor should NOT be read from directly, nor closed, as NNG
+ does not support this.
*/
extern int rmr_get_rcvfd( void* vctx ) {
uta_ctx_t* ctx;
}
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
+ fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}
Clean up things.
There isn't an nng_flush() per se, but we can pause, generate
- a context switch, which should allow the last sent buffer to
+ a context switch, which should allow the last sent buffer to
flow. There isn't exactly an nng_term/close either, so there
isn't much we can do.
*/