Abstract: The mc listener library content. All external functions
should start with mcl_ and all stderr messages should have
(mcl) as the first token following the severity indicator.
-
+
Date: 22 August 2019
Author: E. Scott Daniels
*/
#include <fcntl.h>
#include <signal.h>
#include <sys/stat.h>
+#include <sys/types.h>
+
#include <rmr/rmr.h>
#include <rmr/rmr_symtab.h>
#define TRUE 1
#define FALSE 0
-
/*
Information about one file descriptor. This is pointed to by the hash
such that the message type can be used as a key to look up the fifo's
void* wr_hash; // symtable to look up pipe info based on mt for writing
void* rd_hash; // we support reading from pipes, but need a different FD for that
char* fifo_dir; // directory where we open fifos
-
+
} mcl_ctx_t;
// -------- private -------------------------------------------------------
+/*
+ Set up for raw data capture. We look for directory overriedes from
+ environment variables, and then invoke the rdc_init() to actually
+ set things upd.
+*/
+static void* setup_rdc() {
+ void* ctx;
+ int value; // value computed for something
+ char* ep; // pointer to environment var
+ char* sdir = "/tmp/rdc/stage"; // default directory names
+ char* fdir = "/tmp/rdc/final";
+ char* suffix = ".rdc";
+ char* done = NULL;
+
+ if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
+ if( ep != NULL && atoi( ep ) == 0 ) {
+ logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
+ return NULL;
+ }
+ }
+
+ if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
+ sdir = ep;
+ } else {
+ mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist
+ mkdir( sdir, 0755 );
+ }
+
+ if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
+ fdir = ep;
+ } else {
+ mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there
+ mkdir( fdir, 0755 );
+ }
+
+ if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
+ suffix = ep;
+ }
+
+ if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
+ done = ep;
+ }
+
+ ctx = rdc_init( sdir, fdir, suffix, done );
+ if( ctx == NULL ) {
+ logit( LOG_ERR, "rdc_init did not generate a context" );
+ } else {
+ logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
+ logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
+ }
+
+ if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
+ value = atoi( ep );
+ logit( LOG_INFO, "setting frequency: %d", value );
+ rdc_set_freq( ctx, value );
+ }
+ return ctx;
+}
+
/*
Builds an extended header in the buffer provided, or allocates a new buffer if
dest is nil. The header is of the form:
return dest;
}
-/*
+/*
Build a file name and open. The io_direction is either READER or
WRITER. For a writer we must 'trick' the system into allowing us
- to open a pipe for writing in non-blocking mode so that we can
+ to open a pipe for writing in non-blocking mode so that we can
report on drops (messages we couldn't write because there was no
reader). The trick is to open a reader on the pipe so that when
we open the writer there's a reader and the open won't fail. As
}
snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
-
+
state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
if( state != 0 && errno != EEXIST ) {
- fprintf( stderr, "[ERR] (mcl) unable to create fifo: %s: %s\n", wbuf, strerror( errno ) );
+ logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
return -1;
}
if( io_dir == READER ) {
fd = open( wbuf, O_RDONLY ); // just open the reader
if( fd < 0 ) {
- fprintf( stderr, "[ERR] (mcl) fifo open failed (ro): %s: %s\n", wbuf, strerror( errno ) );
+ logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
}
} else {
jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
if( jfd < 0 ) {
- fprintf( stderr, "[ERR] (mcl) fifo open failed (rw): %s: %s\n", wbuf, strerror( errno ) );
+ logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
}
-
+
fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
if( fd < 0 ) {
- fprintf( stderr, "[ERR] (mcl) fifo open failed (wo): %s: %s\n", wbuf, strerror( errno ) );
+ logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
}
close( jfd ); // should be safe to close this
}
/*
- Callback function driven to traverse the symtab and generate the
+ Callback function driven to traverse the symtab and generate the
counts for each fifo.
*/
static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
}
if( (fifo = (fifo_t *) thing) != NULL ) {
- fprintf( stdout, "[STAT] (mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld\n",
+ logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
fifo->wcount_rp = 0; // reset the report counts
ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
if( ctx->mrc == NULL ) {
- fprintf( stderr, "[CRIT] unable to initialise RMr\n" );
+ logit( LOG_CRIT, "start listening: unable to initialise RMr" );
return 0;
}
while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
if( announce <= 0 ) {
- fprintf( stderr, "[INFO] waiting for RMR to show ready\n" );
+ logit( LOG_INFO, "waiting for RMR to show ready" );
announce = 10;
} else {
announce--;
}
/*
- Blocks until a message arives with a good return code or we timeout. Returns the
+ Blocks until a message arives with a good return code or we timeout. Returns the
rmr message buffer. Timeout value epxected in seconds.
*/
extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
}
if( ctx->mrc == NULL ) {
- fprintf( stderr, "bad context\n" );
+ logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
exit( 1 );
}
ctx->rd_hash = rmr_sym_alloc( 1001 );
if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
- fprintf( stderr, "[ERR] (mcl) unable to allocate hash table for fifo keys\n" );
+ logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
free( ctx );
return NULL;
}
if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
return;
}
-
+
while( TRUE ) {
if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
rp = buf + len;
dneed = strlen( MCL_DELIM ) - len;
-
+
while( dneed > 0 ) {
len = read( fd, rp, dneed );
dneed -= len;
if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
rp = buf + (MCL_EXHDR_SIZE - need);
-
+
while( need > 0 ) {
len = read( fd, rp, need );
need -= len;
The function could look for the delimiter and automatically detect whether
or not extended headers are in use, but if the stream is out of synch on the
first read, this cannot be done, so the funciton requires that the caller
- know that the FIFO contains extended headers.
+ know that the FIFO contains extended headers.
*/
static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
int fd;
msg_len = need = atoi( wbuf );
}
-
+
if( need > ublen ) {
need = ublen; // cannot give them more than they can take
}
while( need > 0 ) {
- len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
+ len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
memcpy( ubuf+got, wbuf, len );
got += len;
need -= len;
if( msg_len > got ) { // we must ditch rest of this message
need = msg_len = got;
while( need > 0 ) {
- len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
+ len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
need -= len;
}
}
Read a single message from the FIFO returning it in the caller's buffer. If extended
headers are being used, and the caller supplied a timestamp buffer, the timestamp
which was in the header will be returned in that buffer. The return value is the number
- of bytes in the buffer; 0 indicates an error and errno should be set.
+ of bytes in the buffer; 0 indicates an error and errno should be set.
*/
extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
/*
Will read messages and fan them out based on the message type. This should not
- return and if it does the caller should assume an error.
+ return and if it does the caller should assume an error.
The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
- string , followed by that number of 'raw' bytes. The raw bytes are the payload
+ string , followed by that number of 'raw' bytes. The raw bytes are the payload
exactly as received.
The report parameter is the frequency, in seconds, for writing a short
mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
fifo_t* fifo; // fifo to chalk counts on
rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
- char wbuf[128]; // buffer to build len string in
+ char header[128]; // header we'll pop in front of the payload
int state;
int fd; // file des to write to
long long total = 0; // total messages received and written
time_t next_report = 0; // we'll report every 2 seconds if report is true
time_t now;
int hwlen; // write len for header
+ void* rdc_ctx = NULL; // raw data capture context
+ void* rdc_buf = NULL; // capture buffer
if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
- fprintf( stderr, "[ERR] (mcl) invalid context given to fanout\n" );
+ logit( LOG_ERR, "(mcl) invalid context given to fanout" );
errno = EINVAL;
return;
}
report = 0;
}
+ rdc_ctx = setup_rdc( ); // pull rdc directories from enviornment and initialise
+
while( 1 ) {
mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
if( fd >= 0 ) {
if( long_hdr ) {
- build_hdr( mbuf->len, wbuf, sizeof( wbuf ) );
+ build_hdr( mbuf->len, header, sizeof( header ) );
hwlen = MCL_EXHDR_SIZE;
} else {
- snprintf( wbuf, sizeof( wbuf ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+ snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
hwlen = MCL_LEN_SIZE;
}
- if( (state = write( fd, wbuf, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
+ if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
drops++;
total_drops++;
chalk_error( fifo );
} else {
- if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
+ if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
errors++;
chalk_error( fifo );
} else {
}
}
}
+
+ if( rdc_ctx != NULL ) {
+ rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
+ rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
+ }
}
if( report ) {
rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
fflush( stdout );
- fprintf( stdout, "[STAT] (mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors\n",
+ logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
total, total_drops, report, count, drops, errors );
next_report = now + report;
count = 0;