-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 Nokia
+ Copyright (c) 2018-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include "rmr_agnostic.h" // agnostic things (must be included before private)
#include "rmr_nng_private.h" // things that we need too
#include "rmr_symtab.h"
+#include "rmr_logging.h"
#include "ring_static.c" // message ring support
#include "rt_generic_static.c" // route table things not transport specific
uta_ctx_t* ctx;
int state;
char* hold_src; // we need the original source if send fails
+ char* hold_ip; // also must hold original ip
int sock_ok = 0; // true if we found a valid endpoint socket
+ endpoint_t* ep; // end point to track counts
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
+ msg->tp_state = errno;
}
return msg;
}
errno = 0; // at this point any bad state is in msg returned
if( msg->header == NULL ) {
- fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
msg->state = RMR_ERR_NOHDR;
+ msg->tp_state = errno;
return msg;
}
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
- if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); // default to IP based rts
- }
+
+ sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts
if( ! sock_ok ) {
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // IP not in rt, try name
+ if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
+ sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
+ }
if( ! sock_ok ) {
msg->state = RMR_ERR_NOENDPT;
return msg; // preallocated msg can be reused since not given back to nn
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
+ hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
msg = send_msg( ctx, msg, nn_sock, -1 );
if( msg ) {
+ if( ep != NULL ) {
+ switch( msg->state ) {
+ case RMR_OK:
+ ep->scounts[EPSC_GOOD]++;
+ break;
+
+ case RMR_ERR_RETRY:
+ ep->scounts[EPSC_TRANS]++;
+ break;
+
+ default:
+ ep->scounts[EPSC_FAIL]++;
+ break;
+ }
+ }
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
}
free( hold_src );
+ free( hold_ip );
return msg;
}
memcpy( expected_id, msg->xaction, RMR_MAX_XID );
expected_id[RMR_MAX_XID] = 0; // ensure it's a string
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id );
errno = 0;
msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
if( msg->state != RMR_ERR_RETRY ) {
msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry
}
+ msg->tp_state = errno;
return msg;
}
rmr_mbuf_t* qm; // message that was queued on the ring
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
+ old_msg->tp_state = errno;
}
- errno = EINVAL;
return old_msg;
}
errno = 0;
rmr_mbuf_t* msg;
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
+ old_msg->tp_state = errno;
}
- errno = EINVAL;
return old_msg;
}
if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
free( eps );
- return NULL;
+ ctx->eps = NULL;
+ if( old_msg != NULL ) {
+ old_msg->state = RMR_ERR_INITFAILED;
+ old_msg->tp_state = errno;
+ }
+ return old_msg;
}
eps->nng_fd = rmr_get_rcvfd( ctx );
if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
free( eps );
- return NULL;
+ ctx->eps = NULL;
+ if( old_msg != NULL ) {
+ old_msg->state = RMR_ERR_INITFAILED;
+ old_msg->tp_state = errno;
+ }
+ return old_msg;
}
ctx->eps = eps;
nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout
if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours
msg->state = RMR_ERR_TIMEOUT;
+ msg->tp_state = errno;
} else {
return rcv_msg( ctx, msg ); // receive it and return it
}
int exp_len = 0; // length of expected ID
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
+ msg->tp_state = errno;
}
- errno = EINVAL;
return msg;
}
if( exp_len > RMR_MAX_XID ) {
exp_len = RMR_MAX_XID;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
msg = rcv_msg( ctx, msg ); // hard wait for next
if( msg->state == RMR_OK ) {
if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
return msg;
}
if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" );
errno = ENOBUFS;
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype );
queued++;
msg = NULL;
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect );
errno = ETIMEDOUT;
return NULL;
}
-// CAUTION: these are not supported as they must be set differently (between create and open) in NNG.
-// until those details are worked out, these generate a warning.
/*
- Set send timeout. The value time is assumed to be microseconds. The timeout is the
- rough maximum amount of time that RMr will block on a send attempt when the underlying
+ Set send timeout. The value time is assumed to be milliseconds. The timeout is the
+ _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
mechnism indicates eagain or etimeedout. All other error conditions are reported
without this delay. Setting a timeout of 0 causes no retries to be attempted in
- RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
- but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
- after every 10K send attempts until the time value is reached. Retries are abandoned
- if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
+ RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
+ but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
+ after every 1K send attempts until the "time" value is reached. Retries are abandoned
+ if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
The default, if this function is not used, is 1; meaning that RMr will retry, but will
not enter a sleep. In all cases the caller should check the status in the message returned
/*
Set receive timeout -- not supported in nng implementation
+
+ CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
*/
extern int rmr_set_rtimeout( void* vctx, int time ) {
- fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
+ rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
return 0;
}
char* proto_port;
char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
+ char* tok2;
int state;
+ int old_vlevel = 0;
+
+ old_vlevel = rmr_vlog_init(); // initialise and get the current level
+ rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
- RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
+ rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
//uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode
- fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
free_ctx( ctx );
return NULL;
}
port = proto_port; // assume something like "1234" was passed
}
- if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
- fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
- return NULL;
- }
- if( (tok = strchr( wbuf, '.' )) != NULL ) {
- *tok = 0; // we don't keep domain portion
+ if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
+ tok = strdup( tok ); // something we can destroy
+ if( *tok == '[' ) { // we allow an ipv6 address here
+ tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
+ } else {
+ tok2 = strchr( tok, ':' ); // find :port if there so we can chop
+ }
+ if( tok2 && *tok2 ) { // if it's not the end of string marker
+ *tok2 = 0; // make it so
+ }
+
+ snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+ free( tok );
+ } else {
+ if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ free( proto_port );
+ return NULL;
+ }
+ if( (tok = strchr( wbuf, '.' )) != NULL ) {
+ *tok = 0; // we don't keep domain portion
+ }
}
+
ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
- fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
return NULL;
}
} else {
ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
if( ctx->my_ip == NULL ) {
- fprintf( stderr, "[WARN] rmr_init: default ip address could not be sussed out, using name\n" );
- strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
+ rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
+ ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
+ if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
+ if( *tok == '1' ) {
+ ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
+ }
+ }
if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
// rather than using this generic listen() call.
snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
- fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
nng_close( ctx->nn_sock );
+ free( proto_port );
free_ctx( ctx );
return NULL;
}
- if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
- if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
- fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ if( flags & FL_NOTHREAD ) { // if no rtc thread, we still need an empty route table for wormholes
+ ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // so create one
+ } else {
+ if( (tok = getenv( ENV_RTG_RAW )) != NULL && *tok == '0' ) { // use RMR for Rmgr comm only when specifically off
+ if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rmr based rt collector thread
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ }
+ } else {
+ if( pthread_create( &ctx->rtc_th, NULL, raw_rtc, (void *) ctx ) ) { // kick the raw msg rt collector thread
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ }
}
}
if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) { // mt call support is on, must start the listener thread if not running
ctx->flags |= CFL_MTC_ENABLED;
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // kick the receiver
- fprintf( stderr, "[WARN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
}
}
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+ rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}
errno = EINVAL;
if( mbuf ) {
mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = errno;
}
return mbuf;
}
errno = EINVAL;
if( mbuf != NULL ) {
mbuf->state = RMR_ERR_NOTSUPP;
+ mbuf->tp_state = errno;
}
return mbuf;
}
}
chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
-
- if( max_wait > 0 ) {
+
+ if( max_wait >= 0 ) {
clock_gettime( CLOCK_REALTIME, &ts );
if( max_wait > 999 ) {
- seconds = (max_wait - 999)/1000;
+ seconds = max_wait / 1000;
max_wait -= seconds * 1000;
ts.tv_sec += seconds;
}
seconds = 1; // use as flag later to invoked timed wait
}
- errno = 0;
- while( chute->mbuf == NULL && ! errno ) {
+ errno = EINTR;
+ state = -1;
+ while( state < 0 && errno == EINTR ) {
if( seconds ) {
state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
} else {
state = sem_wait( &chute->barrier );
}
-
- if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
- errno = 0;
- }
}
if( state < 0 ) {
mbuf = ombuf; // return caller's buffer if they passed one in
} else {
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+ errno = 0; // interrupted call state could be left; clear
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
- if( mbuf ) {
- mbuf->state = RMR_OK;
-
- if( ombuf ) {
- rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
- }
- } else {
- mbuf = ombuf; // no buffer, return user's if there
+ mbuf->state = RMR_OK;
+
+ if( ombuf ) {
+ rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
}
+ } else {
+ errno = ETIMEDOUT;
+ mbuf = ombuf; // no buffer, return user's if there
}
}
+ if( mbuf ) {
+ mbuf->tp_state = errno;
+ }
return mbuf;
}
long nano_sec; // max wait xlated to nano seconds
int state;
+ errno = EINVAL;
if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
- errno = EINVAL;
if( mbuf ) {
+ mbuf->tp_state = errno;
mbuf->state = RMR_ERR_BADARG;
}
return mbuf;
if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
mbuf->state = RMR_ERR_NOTSUPP;
+ mbuf->tp_state = errno;
return mbuf;
}
if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = errno;
return mbuf;
}
d1[D1_CALLID_IDX] = (unsigned char) call_id; // set the caller ID for the response
mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
- if( max_wait > 0 ) {
+ if( max_wait >= 0 ) {
clock_gettime( CLOCK_REALTIME, &ts );
if( max_wait > 999 ) {
- seconds = (max_wait - 999)/1000;
+ seconds = max_wait / 1000;
max_wait -= seconds * 1000;
ts.tv_sec += seconds;
}
mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
if( mbuf ) {
if( mbuf->state != RMR_OK ) {
+ mbuf->tp_state = errno;
return mbuf; // timeout or unable to connect or no endpoint are most likely issues
}
}
+ state = -1;
errno = 0;
while( chute->mbuf == NULL && ! errno ) {
if( seconds ) {
return NULL; // leave errno as set by sem wait call
}
- mbuf = chute->mbuf;
- mbuf->state = RMR_OK;
+ if( (mbuf = chute->mbuf) != NULL ) {
+ mbuf->state = RMR_OK;
+ }
chute->mbuf = NULL;
return mbuf;
}
+
+/*
+ Given an existing message buffer, reallocate the payload portion to
+ be at least new_len bytes. The message header will remain such that
+ the caller may use the rmr_rts_msg() function to return a payload
+ to the sender.
+
+ The mbuf passed in may or may not be reallocated and the caller must
+ use the returned pointer and should NOT assume that it can use the
+ pointer passed in with the exceptions based on the clone flag.
+
+ If the clone flag is set, then a duplicated message, with larger payload
+ size, is allocated and returned. The old_msg pointer in this situation is
+ still valid and must be explicitly freed by the application. If the clone
+ message is not set (0), then any memory management of the old message is
+ handled by the function.
+
+ If the copy flag is set, the contents of the old message's payload is
+ copied to the reallocated payload. If the flag is not set, then the
+ contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+ if( old_msg == NULL ) {
+ return NULL;
+ }
+
+ return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
+}
+
+/*
+ The following functions are "dummies" as NNG has no concept of supporting
+ them, but are needed to resolve calls at link time.
+*/
+
+extern void rmr_set_fack( void* p ) {
+ return;
+}