// vim: ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 Nokia
+ Copyright (c) 2018-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include "rmr_agnostic.h" // agnostic things (must be included before private)
#include "rmr_nng_private.h" // things that we need too
#include "rmr_symtab.h"
+#include "rmr_logging.h"
#include "ring_static.c" // message ring support
#include "rt_generic_static.c" // route table things not transport specific
errno = 0; // at this point any bad state is in msg returned
if( msg->header == NULL ) {
- fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
msg->state = RMR_ERR_NOHDR;
msg->tp_state = errno;
return msg;
memcpy( expected_id, msg->xaction, RMR_MAX_XID );
expected_id[RMR_MAX_XID] = 0; // ensure it's a string
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id );
errno = 0;
msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
free( eps );
- return NULL;
+ ctx->eps = NULL;
+ if( old_msg != NULL ) {
+ old_msg->state = RMR_ERR_INITFAILED;
+ old_msg->tp_state = errno;
+ }
+ return old_msg;
}
eps->nng_fd = rmr_get_rcvfd( ctx );
if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
free( eps );
- return NULL;
+ ctx->eps = NULL;
+ if( old_msg != NULL ) {
+ old_msg->state = RMR_ERR_INITFAILED;
+ old_msg->tp_state = errno;
+ }
+ return old_msg;
}
ctx->eps = eps;
if( exp_len > RMR_MAX_XID ) {
exp_len = RMR_MAX_XID;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
msg = rcv_msg( ctx, msg ); // hard wait for next
if( msg->state == RMR_OK ) {
if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
return msg;
}
if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" );
errno = ENOBUFS;
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype );
queued++;
msg = NULL;
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect );
errno = ETIMEDOUT;
return NULL;
}
CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
*/
extern int rmr_set_rtimeout( void* vctx, int time ) {
- fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
+ rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
return 0;
}
char* tok; // pointer at token in a buffer
char* tok2;
int state;
+ int old_vlevel = 0;
+
+ old_vlevel = rmr_vlog_init(); // initialise and get the current level
+ rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
+ rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
//uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode
- fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
free_ctx( ctx );
return NULL;
}
free( tok );
} else {
if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
- fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ free( proto_port );
return NULL;
}
if( (tok = strchr( wbuf, '.' )) != NULL ) {
ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
- fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
return NULL;
}
} else {
ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
if( ctx->my_ip == NULL ) {
- fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
- strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
+ rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
+ ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
if( *tok == '1' ) {
// rather than using this generic listen() call.
snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
- fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
nng_close( ctx->nn_sock );
+ free( proto_port );
free_ctx( ctx );
return NULL;
}
- if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
- if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
- fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ if( flags & FL_NOTHREAD ) { // if no rtc thread, we still need an empty route table for wormholes
+ ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // so create one
+ } else {
+ if( (tok = getenv( ENV_RTG_RAW )) != NULL && *tok == '0' ) { // use RMR for Rmgr comm only when specifically off
+ if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rmr based rt collector thread
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ }
+ } else {
+ if( pthread_create( &ctx->rtc_th, NULL, raw_rtc, (void *) ctx ) ) { // kick the raw msg rt collector thread
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ }
}
}
if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running
ctx->flags |= CFL_MTC_ENABLED;
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver
- fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
}
}
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+ rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}
mbuf = ombuf; // return caller's buffer if they passed one in
} else {
errno = 0; // interrupted call state could be left; clear
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
mbuf->state = RMR_OK;
return mbuf;
}
-/*
- Accept a message buffer and caller ID, send the message and then wait
- for the receiver to tickle the semaphore letting us know that a message
- has been received. The call_id is a value between 2 and 255, inclusive; if
- it's not in this range an error will be returned. Max wait is the amount
- of time in millaseconds that the call should block for. If 0 is given
- then no timeout is set.
- If the mt_call feature has not been initialised, then the attempt to use this
- funciton will fail with RMR_ERR_NOTSUPP
+/*
+ This does the real work behind both of the outward facing call functions. See
+ the rmr_mt_call() description for details modulo the comments blow.
- If no matching message is received before the max_wait period expires, a
- nil pointer is returned, and errno is set to ETIMEOUT. If any other error
- occurs after the message has been sent, then a nil pointer is returned
- with errno set to some other value.
+ If ep is given, then we skip the normal route table endpoint selection. This is
+ likely a wormhole call.
*/
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
rmr_mbuf_t* ombuf; // original mbuf passed in
uta_ctx_t* ctx;
uta_mhdr_t* hdr; // header in the transport buffer
seconds = 1; // use as flag later to invoked timed wait
}
- mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
+ if( ep != NULL ) {
+ mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
+ } else {
+ mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+ }
if( mbuf ) {
if( mbuf->state != RMR_OK ) {
mbuf->tp_state = errno;
}
}
- state = 0;
+ state = -1;
errno = 0;
while( chute->mbuf == NULL && ! errno ) {
if( seconds ) {
return NULL; // leave errno as set by sem wait call
}
- mbuf = chute->mbuf;
- mbuf->state = RMR_OK;
+ if( (mbuf = chute->mbuf) != NULL ) {
+ mbuf->state = RMR_OK;
+ }
chute->mbuf = NULL;
return mbuf;
}
+
+/*
+ Accept a message buffer and caller ID, send the message and then wait
+ for the receiver to tickle the semaphore letting us know that a message
+ has been received. The call_id is a value between 2 and 255, inclusive; if
+ it's not in this range an error will be returned. Max wait is the amount
+ of time in millaseconds that the call should block for. If 0 is given
+ then no timeout is set.
+
+ If the mt_call feature has not been initialised, then the attempt to use this
+ funciton will fail with RMR_ERR_NOTSUPP
+
+ If no matching message is received before the max_wait period expires, a
+ nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+ occurs after the message has been sent, then a nil pointer is returned
+ with errno set to some other value.
+
+ This is now just a wrapper to the real work horse so that we can provide
+ this and wormhole call functions without duplicating code.
+
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+ return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+/*
+ Given an existing message buffer, reallocate the payload portion to
+ be at least new_len bytes. The message header will remain such that
+ the caller may use the rmr_rts_msg() function to return a payload
+ to the sender.
+
+ The mbuf passed in may or may not be reallocated and the caller must
+ use the returned pointer and should NOT assume that it can use the
+ pointer passed in with the exceptions based on the clone flag.
+
+ If the clone flag is set, then a duplicated message, with larger payload
+ size, is allocated and returned. The old_msg pointer in this situation is
+ still valid and must be explicitly freed by the application. If the clone
+ message is not set (0), then any memory management of the old message is
+ handled by the function.
+
+ If the copy flag is set, the contents of the old message's payload is
+ copied to the reallocated payload. If the flag is not set, then the
+ contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+ if( old_msg == NULL ) {
+ return NULL;
+ }
+
+ return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
+}
+
+/*
+ The following functions are "dummies" as NNG has no concept of supporting
+ them, but are needed to resolve calls at link time.
+*/
+
+extern void rmr_set_fack( void* p ) {
+ return;
+}