char* done = NULL;
if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0
- logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
+ logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)" );
return NULL;
}
<delim><len><timestamp>
Field lengths (bytes) are:
- delim 4
+ delim 4
len 8 (7 digits + 0)
timestamp 16 (15 digits + 0)
fifo = NULL;
}
}
+ } else {
+ if( fifo->fd < 0 ) { // it existed, but was closed; reopen
+ fifo->fd = open_fifo( ctx, mtype, io_dir );
+ }
}
if( fref != NULL ) {
return fifo == NULL ? -1 : fifo->fd;
}
+/*
+ Should we need to close a FIFO we do so and leave the block in the hash
+ with a bad FD so that we'll attempt to reopen on next use.
+*/
+static void close_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
+ fifo_t* fifo;
+ void* hash;
+
+ if( ctx == NULL ) {
+ return;
+ }
+
+ if( io_dir == READER ) { // with an integer key, we need two hash tables
+ hash = ctx->rd_hash;
+ } else {
+ hash = ctx->wr_hash;
+ }
+
+ if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) != NULL ) {
+ if( fifo->fd >= 0 ) {
+ close( fifo->fd );
+ fifo->fd = -1;
+ }
+ }
+}
+
/*
Make marking counts easier in code
*/
}
}
+/*
+ Writes the indicated bytes (n2write) from buf onto the fd. Returns only after
+ full buffer is written, or there is a hard error (not eagain or eintr).
+ Returns the number written; if less than n2write the caller may assume
+ that there was a hard error and errno should reflect.
+*/
+static inline int write_all( int fd, char const* buf, int n2write ) {
+ ssize_t remain = 0; // number of bytes remaining to write
+ ssize_t wrote = 0; // number of bytes written thus far
+ ssize_t state = 0;
+
+ if( fd < 0 ) {
+ errno = EBADFD;
+ return 0;
+ }
+
+ errno = 0;
+ remain = n2write;
+ do {
+ if( (state = write( fd, buf + wrote, remain )) > 0 ) {
+ wrote += state;
+ remain = n2write - wrote;
+ }
+ } while( remain > 0 && (errno == EINTR || errno == EAGAIN) ) ;
+
+ return wrote;
+}
+
+/*
+ Similar to write_all, this will write all bytes in the buffer, but
+ will return failure if the first write attempt fails with 0 written
+ (assuming that the pipe has no reader). We use this when writing the
+ header bytes; we want to drop the message if we can't even write one
+ byte, but if we write one, we must loop until all are written.
+
+ Returns the number written. If that value is less than n2write, then
+ the caller may assume a hard error occurred and errno should reflect.
+ If 0 is returned it can be assumed that the FIFO would block/has no
+ reader.
+*/
+static inline int write_all_nb( int fd, char const* buf, int n2write ) {
+ ssize_t remain = 0; // number of bytes remaining to write
+ ssize_t wrote = 0; // number of bytes written
+
+ if( fd < 0 ) {
+ errno = EBADFD;
+ return 0;
+ }
+
+ errno = 0;
+ remain = n2write;
+ wrote = write( fd, buf, remain );
+ if( wrote < 0 ) { // report error with exception for broken pipe
+ return errno == EPIPE ? 0 : -1; // broken pipe we assume no reader and return 0 since nothing written
+ }
+
+ if( wrote < n2write && wrote > 0 ) { // if we wrote anything, we must tough it out and write all if it was short
+ wrote += write_all( fd, buf + wrote, n2write - wrote );
+ }
+
+ return wrote;
+}
+
// ---------- public ------------------------------------------------------
/*
Sets a signal handler for sigpipe so we don't crash if a reader closes the
time_t next_report = 0; // we'll report every 2 seconds if report is true
time_t now;
size_t hwlen; // write len for header
+ size_t wrote; // number of bytes actually written
void* rdc_ctx = NULL; // raw data capture context
void* rdc_buf = NULL; // capture buffer
hwlen = MCL_LEN_SIZE;
}
- fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
+ fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
if( fd >= 0 ) {
- if( write( fd, header, hwlen ) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
+ if( (wrote = write_all_nb( fd, header, hwlen )) == 0 ) { // write header; 0 indicates no reader, drop silently
drops++;
total_drops++;
chalk_error( fifo );
} else {
- if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
+ if( wrote != hwlen ) {
+ logit( LOG_ERR, "(mcl): error writing header to fifo; mt=%d wrote=%d tried=%d: %s", mbuf->mtype, wrote, hwlen, strerror( errno ) );
errors++;
chalk_error( fifo );
+ close_fifo( ctx, mbuf->mtype, WRITER );
} else {
- chalk_ok( fifo );
- count++;
- total++;
+ if( write_all( fd, mbuf->payload, mbuf->len ) != mbuf->len ) { // we wrote a header, so we must write all; no drop at this point
+ logit( LOG_ERR, "(mcl): error writing payload to fifo; mt=%d: %s\n", mbuf->mtype, strerror( errno ) );
+ close_fifo( ctx, mbuf->mtype, WRITER );
+ } else {
+ chalk_ok( fifo );
+ count++;
+ total++;
+ }
}
}
}
- if( rdc_ctx != NULL ) {
+ if( rdc_ctx != NULL ) { // always put the message to the rdc files if collecting; eve if pipe write failed
rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
}