// : vi ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
+ Copyright (c) 2019 Nokia
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
/*
Mnemonic: rmr_nng.c
- Abstract: This is the compile point for the nng version of the rmr
+ Abstract: This is the compile point for the nng version of the rmr
library (formarly known as uta, so internal function names
are likely still uta_*)
- With the exception of the symtab portion of the library,
- RMr is built with a single compile so as to "hide" the
+ With the exception of the symtab portion of the library,
+ RMr is built with a single compile so as to "hide" the
internal functions as statics. Because they interdepend
on each other, and CMake has issues with generating two
different wormhole objects from a single source, we just
pull it all together with a centralised comple using
- includes.
+ includes.
Future: the API functions at this point can be separated
into a common source module.
Returns the size of the payload (bytes) that the msg buffer references.
Len in a message is the number of bytes which were received, or should
be transmitted, however, it is possible that the mbuf was allocated
- with a larger payload space than the payload length indicates; this
+ with a larger payload space than the payload length indicates; this
function returns the absolute maximum space that the user has available
in the payload. On error (bad msg buffer) -1 is returned and errno should
indicate the rason.
}
}
}
-
+
free( mbuf );
}
/*
send message with maximum timeout.
- Accept a message and send it to an endpoint based on message type.
+ Accept a message and send it to an endpoint based on message type.
If NNG reports that the send attempt timed out, or should be retried,
RMr will retry for approximately max_to microseconds; rounded to the next
higher value of 10.
Allocates a new message buffer for the next send. If a message type has
more than one group of endpoints defined, then the message will be sent
- in round robin fashion to one endpoint in each group.
+ in round robin fashion to one endpoint in each group.
CAUTION: this is a non-blocking send. If the message cannot be sent, then
it will return with an error and errno set to eagain. If the send is
a limited fanout, then the returned status is the status of the last
send attempt.
-
+
*/
extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
nng_socket nn_sock; // endpoint socket for send
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
errno = EINVAL; // must ensure it's not eagain
- }
+ }
return msg;
}
while( send_again ) {
sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->mtype, send_again, group, msg->len, sock_ok );
group++;
/*
if( msg ) {
// error do we need to count successes/errors, how to report some success, esp if last fails?
- }
+ }
*/
msg = clone_m; // clone will be the next to send
}
/*
- Send with default max timeout as is set in the context.
+ Send with default max timeout as is set in the context.
See rmr_mtosend_msg() for more details on the parameters.
See rmr_stimeout() for info on setting the default timeout.
*/
}
/*
- Return to sender allows a message to be sent back to the endpoint where it originated.
+ Return to sender allows a message to be sent back to the endpoint where it originated.
The source information in the message is used to select the socket on which to write
- the message rather than using the message type and round-robin selection. This
+ the message rather than using the message type and round-robin selection. This
should return a message buffer with the state of the send operation set. On success
(state is RMR_OK, the caller may use the buffer for another receive operation), and on
error it can be passed back to this function to retry the send if desired. On error,
errno = EINVAL; // if msg is null, this is their clue
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
return msg;
}
Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
to ensure that no error was encountered. If the state is UTA_BADARG, then the message
- may be resent (likely the context pointer was nil). If the message is sent, but no
+ may be resent (likely the context pointer was nil). If the message is sent, but no
response is received, a nil message is returned with errno set to indicate the likley
issue:
ETIMEDOUT -- too many messages were queued before reciving the expected response
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
return msg;
}
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return old_msg;
}
}
/*
- This implements a receive with a timeout via epoll. Mostly this is for
- wrappers as native C applications can use epoll directly and will not have
+ This implements a receive with a timeout via epoll. Mostly this is for
+ wrappers as native C applications can use epoll directly and will not have
to depend on this.
*/
extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return old_msg;
}
}
if( (eps = ctx->eps) == NULL ) { // set up epoll on first call
- eps = malloc( sizeof *eps );
+ eps = malloc( sizeof *eps );
- if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
- fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
+ if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
free( eps );
return NULL;
- }
+ }
eps->nng_fd = rmr_get_rcvfd( ctx );
eps->epe.events = EPOLLIN;
- eps->epe.data.fd = eps->nng_fd;
+ eps->epe.data.fd = eps->nng_fd;
- if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
- fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+ if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
+ fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
free( eps );
return NULL;
- }
+ }
ctx->eps = eps;
}
/*
This blocks until the message with the 'expect' ID is received. Messages which are received
- before the expected message are queued onto the message ring. The function will return
+ before the expected message are queued onto the message ring. The function will return
a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
expected message is received. If the queued message ring fills a nil pointer is returned
and errno is set to ENOBUFS.
- Generally this will be invoked only by the call() function as it waits for a response, but
+ Generally this will be invoked only by the call() function as it waits for a response, but
it is exposed to the user application as three is no reason not to.
*/
extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
uta_ctx_t* ctx;
int queued = 0; // number we pushed into the ring
int exp_len = 0; // length of expected ID
-
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return msg;
}
// CAUTION: these are not supported as they must be set differently (between create and open) in NNG.
// until those details are worked out, these generate a warning.
/*
- Set send timeout. The value time is assumed to be microseconds. The timeout is the
+ Set send timeout. The value time is assumed to be microseconds. The timeout is the
rough maximum amount of time that RMr will block on a send attempt when the underlying
mechnism indicates eagain or etimeedout. All other error conditions are reported
without this delay. Setting a timeout of 0 causes no retries to be attempted in
RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
- but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
+ but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
after every 10K send attempts until the time value is reached. Retries are abandoned
if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
/*
This is the actual init workhorse. The user visible function meerly ensures that the
- calling programme does NOT set any internal flags that are supported, and then
+ calling programme does NOT set any internal flags that are supported, and then
invokes this. Internal functions (the route table collector) which need additional
- open ports without starting additional route table collectors, will invoke this
+ open ports without starting additional route table collectors, will invoke this
directly with the proper flag.
*/
static void* init( char* uproto_port, int max_msg_size, int flags ) {
char* port;
char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
char* proto_port;
- char wbuf[1024]; // work buffer
+ char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
int state;
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
+ fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
Flags:
No user flags supported (needed) at the moment, but this provides for extension
- without drastically changing anything. The user should invoke with RMRFL_NONE to
+ without drastically changing anything. The user should invoke with RMRFL_NONE to
avoid any misbehavour as there are internal flags which are suported
*/
extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
/*
This sets the default trace length which will be added to any message buffers
- allocated. It can be set at any time, and if rmr_set_trace() is given a
+ allocated. It can be set at any time, and if rmr_set_trace() is given a
trace len that is different than the default allcoated in a message, the message
will be resized.
}
/*
- Returns a file descriptor which can be used with epoll() to signal a receive
- pending. The file descriptor should NOT be read from directly, nor closed, as NNG
- does not support this.
+ Returns a file descriptor which can be used with epoll() to signal a receive
+ pending. The file descriptor should NOT be read from directly, nor closed, as NNG
+ does not support this.
*/
extern int rmr_get_rcvfd( void* vctx ) {
uta_ctx_t* ctx;
Clean up things.
There isn't an nng_flush() per se, but we can pause, generate
- a context switch, which should allow the last sent buffer to
+ a context switch, which should allow the last sent buffer to
flow. There isn't exactly an nng_term/close either, so there
isn't much we can do.
*/