--- /dev/null
+// vim: ts=4 sw=4 noet:
+/*
+--------------------------------------------------------------------------------
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+--------------------------------------------------------------------------------
+*/
+
+/*
+ Mnemonic: mcl.c.
+ Abstract: The mc listener library content. All external functions
+ should start with mcl_ and all stderr messages should have
+ (mcl) as the first token following the severity indicator.
+
+ Date: 22 August 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+
+#include <rmr/rmr.h>
+#include <rmr/rmr_symtab.h>
+
+#include "mcl.h"
+
+#define READER 0
+#define WRITER 1
+
+#define TRUE 1
+#define FALSE 0
+
+/*
+ Information about one file descriptor. This is pointed to by the hash
+ such that the message type can be used as a key to look up the fifo's
+ file descriptor.
+*/
+typedef struct {
+ int fd; // open fdes
+ int key; // symtab key
+ long long wcount; // number of writes
+ long long drops; // number dropped
+
+ long long wcount_rp; // number of writes during last reporting period
+ long long drops_rp; // number dropped during last reporting period
+} fifo_t;
+
+/*
+ Our conext. Pointers to the read and write hash tables (both keyed on the message
+ type), the message router (RMR) context, and other goodies.
+*/
+typedef struct {
+ void* mrc; // the message router's context
+ void* wr_hash; // symtable to look up pipe info based on mt for writing
+ void* rd_hash; // we support reading from pipes, but need a different FD for that
+ char* fifo_dir; // directory where we open fifos
+
+} mcl_ctx_t;
+
+// -------- private -------------------------------------------------------
+
+
+/*
+ Set up for raw data capture. We look for directory overriedes from
+ environment variables, and then invoke the rdc_init() to actually
+ set things upd.
+*/
+static void* setup_rdc() {
+ void* ctx;
+ int value; // value computed for something
+ char* ep; // pointer to environment var
+ char* sdir = "/tmp/rdc/stage"; // default directory names
+ char* fdir = "/tmp/rdc/final";
+ char* suffix = ".rdc";
+ char* done = NULL;
+
+ if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
+ if( ep != NULL && atoi( ep ) == 0 ) {
+ logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
+ return NULL;
+ }
+ }
+
+ if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
+ sdir = ep;
+ } else {
+ mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist
+ mkdir( sdir, 0755 );
+ }
+
+ if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
+ fdir = ep;
+ } else {
+ mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there
+ mkdir( fdir, 0755 );
+ }
+
+ if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
+ suffix = ep;
+ }
+
+ if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
+ done = ep;
+ }
+
+ ctx = rdc_init( sdir, fdir, suffix, done );
+ if( ctx == NULL ) {
+ logit( LOG_ERR, "rdc_init did not generate a context" );
+ } else {
+ logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
+ logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
+ }
+
+ if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
+ value = atoi( ep );
+ logit( LOG_INFO, "setting frequency: %d", value );
+ rdc_set_freq( ctx, value );
+ }
+ return ctx;
+}
+
+/*
+ Builds an extended header in the buffer provided, or allocates a new buffer if
+ dest is nil. The header is of the form:
+ <delim><len><timestamp>
+
+ Timestamp is a single unsigned long long in ASCII; ms since epoch.
+ If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
+ the timestamp generated will be 1570113591103.
+*/
+static char* build_hdr( int len, char* dest, int dest_len ) {
+ struct timespec ts; // time just before call executed
+
+ if( dest == NULL ) {
+ dest_len = 48;
+ dest = (char *) malloc( sizeof( char ) * dest_len );
+ } else {
+ if( dest_len < 28 ) { // shouldn't happen, but take no chances
+ memset( dest, 0, dest_len );
+ return NULL;
+ }
+ }
+
+ memset( dest, 0, dest_len );
+
+ clock_gettime( CLOCK_REALTIME, &ts );
+ sprintf( dest, "%s%07d", MCL_DELIM, len );
+ sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
+
+ return dest;
+}
+
+/*
+ Build a file name and open. The io_direction is either READER or
+ WRITER. For a writer we must 'trick' the system into allowing us
+ to open a pipe for writing in non-blocking mode so that we can
+ report on drops (messages we couldn't write because there was no
+ reader). The trick is to open a reader on the pipe so that when
+ we open the writer there's a reader and the open won't fail. As
+ soon as we have the writer open, we can close the junk reader.
+
+ If the desired fifo does not exist, it is created.
+*/
+static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
+ char wbuf[1024];
+ int fd; // real file des
+ int jfd = -1; // junk file des
+ int state;
+
+ if( ctx == NULL || mtype < 0 ) {
+ return -1;
+ }
+
+ snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
+
+ state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
+ if( state != 0 && errno != EEXIST ) {
+ logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
+ return -1;
+ }
+
+ if( io_dir == READER ) {
+ fd = open( wbuf, O_RDONLY ); // just open the reader
+ if( fd < 0 ) {
+ logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
+ }
+ } else {
+ jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
+ if( jfd < 0 ) {
+ logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
+ }
+
+ fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
+ if( fd < 0 ) {
+ logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
+ }
+
+ close( jfd ); // should be safe to close this
+ }
+
+
+ return fd;
+}
+
+/*
+ Given a message type, return the file des of the fifo that
+ the payload should be written to. Returns the file des, or -1
+ on error. When sussing out a read file descriptor this will
+ block until there is a fifo for the message type which is
+ open for reading.
+
+ If fref is not nil, then a pointer to the fifo info block is returned
+ allowing for direct update of counts after the write.
+*/
+static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
+ fifo_t* fifo;
+ void* hash;
+
+ if( io_dir == READER ) { // with an integer key, we nned two hash tables
+ hash = ctx->rd_hash;
+ } else {
+ hash = ctx->wr_hash;
+ }
+
+ if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
+ fifo = (fifo_t *) malloc( sizeof( *fifo ) );
+ if( fifo == NULL ) {
+ return -1;
+ }
+
+ memset( fifo, 0, sizeof( *fifo ) );
+ fifo->key = mtype;
+ fifo->fd = open_fifo( ctx, mtype, io_dir );
+ rmr_sym_map( hash, mtype, fifo );
+ }
+
+ if( fref != NULL ) {
+ *fref = fifo;
+ }
+ return fifo->fd;
+}
+
+/*
+ Make marking counts easier in code
+*/
+static inline void chalk_error( fifo_t* fifo ) {
+ if( fifo != NULL ) {
+ fifo->drops++;
+ fifo->drops_rp++;
+ }
+}
+
+static inline void chalk_ok( fifo_t* fifo ) {
+ if( fifo != NULL ) {
+ fifo->wcount++;
+ fifo->wcount_rp++;
+ }
+}
+
+/*
+ Callback function driven to traverse the symtab and generate the
+ counts for each fifo.
+*/
+static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
+ fifo_t* fifo;
+ int report_period = 60;
+
+ if( data ) {
+ report_period = *((int *) data);
+ }
+
+ if( (fifo = (fifo_t *) thing) != NULL ) {
+ logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
+ fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
+
+ fifo->wcount_rp = 0; // reset the report counts
+ fifo->drops_rp = 0;
+ }
+}
+
+// ---------- public ------------------------------------------------------
+/*
+ Sets a signal handler for sigpipe so we don't crash if a reader closes the
+ last reading fd on a pipe. We could do this automatically, but if the user
+ programme needs to trap sigpipe too, this gives them the option not to have
+ us interfere.
+*/
+extern int mcl_set_sigh( ) {
+ signal( SIGPIPE, SIG_IGN );
+}
+
+/*
+ "Opens" the interface to RMR such that messages sent to the application will
+ be available via the rmr receive funcitons. This is NOT automatically called
+ by the mk_context function as some applications will be using the mc library
+ for non-RMR, fifo, chores.
+*/
+extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) {
+ mcl_ctx_t* ctx;
+ int announce = 0;
+
+ if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
+ return 0;
+ }
+
+ ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
+ if( ctx->mrc == NULL ) {
+ logit( LOG_CRIT, "start listening: unable to initialise RMr" );
+ return 0;
+ }
+
+ while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
+ if( announce <= 0 ) {
+ logit( LOG_INFO, "waiting for RMR to show ready" );
+ announce = 10;
+ } else {
+ announce--;
+ }
+
+ sleep( 1 );
+ }
+
+ return 1;
+}
+
+/*
+ Blocks until a message arives with a good return code or we timeout. Returns the
+ rmr message buffer. Timeout value epxected in seconds.
+*/
+extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
+ mcl_ctx_t* ctx;
+
+ if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
+ return NULL;
+ }
+
+ if( ctx->mrc == NULL ) {
+ logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
+ exit( 1 );
+ }
+
+ do {
+ msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next
+ } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
+
+ return msg;
+}
+
+/*
+ Create the context.
+*/
+extern void* mcl_mk_context( char* dir ) {
+ mcl_ctx_t* ctx;
+
+ if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
+ memset( ctx, 0, sizeof( *ctx ) );
+ ctx->fifo_dir = strdup( dir );
+ ctx->wr_hash = rmr_sym_alloc( 1001 );
+ ctx->rd_hash = rmr_sym_alloc( 1001 );
+
+ if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
+ logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
+ free( ctx );
+ return NULL;
+ }
+ }
+
+ return (void *) ctx;
+}
+
+/*
+ Read the header. Best case we read the expected number of bytes, get all
+ of them and find that they start with the delemiter. Worst case
+ We have to wait for all of the header, or need to synch at the next
+ delimeter. We assume best case most likely and handle it as such.
+*/
+static void read_header( int fd, char* buf ) {
+ int len;
+ int need = MCL_EXHDR_SIZE; // total needed
+ int dneed; // delimieter needed
+ int rlen;
+ char* rp; // read position in buf
+
+ len = read( fd, buf, need );
+ if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
+ return;
+ }
+
+ while( TRUE ) {
+ if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
+ rp = buf + len;
+ dneed = strlen( MCL_DELIM ) - len;
+
+ while( dneed > 0 ) {
+ len = read( fd, rp, dneed );
+ dneed -= len;
+ rp += len;
+ }
+ }
+
+ if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
+ need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
+ rp = buf + (MCL_EXHDR_SIZE - need);
+
+ while( need > 0 ) {
+ len = read( fd, rp, need );
+ need -= len;
+ rp += len;
+ }
+
+ return;
+ }
+
+ while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
+ len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
+ }
+
+ need = MCL_EXHDR_SIZE - len;
+ }
+}
+
+
+/*
+ Read one record from the fifo that the message type maps to.
+ Writes at max ublen bytes into the ubuf.
+
+ If long_hdrs is true (!0), then we expect that the stream in the fifo
+ has extended headers (<delim><len><time>), and will write the timestamp
+ from the header into the buffer pointed to by timestamp. The buffer is
+ assumed to be at least MCL_TSTAMP_SIZE bytes in length.
+
+ Further, when extended headers are being used, this function will
+ automatically resynchronise if it detects an issue.
+
+ The function could look for the delimiter and automatically detect whether
+ or not extended headers are in use, but if the stream is out of synch on the
+ first read, this cannot be done, so the funciton requires that the caller
+ know that the FIFO contains extended headers.
+*/
+static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
+ int fd;
+ int len;
+ int msg_len;
+ int got = 0; // number of bytes we actually got
+ int need;
+ char wbuf[4096];
+ mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
+ fifo_t* fref = NULL; // the fifo struct we found
+
+ if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
+ errno = EINVAL;
+ return 0;
+ }
+
+ if( (fd = suss_fifo( ctx, mtype, READER, NULL )) >= 0 ) {
+ if( long_hdrs ) {
+ read_header( fd, wbuf );
+ msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
+ if( timestamp ) {
+ strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
+ }
+ } else {
+ if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
+ *timestamp = 0;
+ }
+
+ len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
+ msg_len = need = atoi( wbuf );
+ }
+
+
+ if( need > ublen ) {
+ need = ublen; // cannot give them more than they can take
+ }
+ while( need > 0 ) {
+ len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
+ memcpy( ubuf+got, wbuf, len );
+ got += len;
+ need -= len;
+ }
+
+ if( msg_len > got ) { // we must ditch rest of this message
+ need = msg_len = got;
+ while( need > 0 ) {
+ len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
+ need -= len;
+ }
+ }
+
+ return got;
+ }
+
+ errno = EBADFD;
+ return 0;
+}
+
+/*
+ Read one record from the fifo that the message type maps to.
+ Writes at max ublen bytes into the ubuf. If extended headers are in use
+ this function will ignore the timestamp.
+
+ If long_hdrs is true (!0), then we expect that the stream in the fifo
+ has extended headers (<delim><len><time>).
+*/
+extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
+ return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
+}
+
+/*
+ Read a single message from the FIFO returning it in the caller's buffer. If extended
+ headers are being used, and the caller supplied a timestamp buffer, the timestamp
+ which was in the header will be returned in that buffer. The return value is the number
+ of bytes in the buffer; 0 indicates an error and errno should be set.
+*/
+extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
+ return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
+}
+
+
+/*
+ Will read messages and fan them out based on the message type. This should not
+ return and if it does the caller should assume an error.
+
+ The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
+ string , followed by that number of 'raw' bytes. The raw bytes are the payload
+ exactly as received.
+
+ The report parameter is the frequency, in seconds, for writing a short
+ status report to stdout. If 0 then it's off.
+
+ If long_hdr is true, then we geneate an extended header with a delimiter and
+ timestamp.
+*/
+extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
+ mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
+ fifo_t* fifo; // fifo to chalk counts on
+ rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
+ char header[128]; // header we'll pop in front of the payload
+ int state;
+ int fd; // file des to write to
+ long long total = 0; // total messages received and written
+ long long total_drops = 0; // total messages received and written
+ long count = 0; // messages received and written during last reporting period
+ long errors = 0; // unsuccessful payload writes
+ long drops; // number of drops
+ time_t next_report = 0; // we'll report every 2 seconds if report is true
+ time_t now;
+ int hwlen; // write len for header
+ void* rdc_ctx = NULL; // raw data capture context
+ void* rdc_buf = NULL; // capture buffer
+
+ if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
+ logit( LOG_ERR, "(mcl) invalid context given to fanout" );
+ errno = EINVAL;
+ return;
+ }
+
+ if( report < 0 ) {
+ report = 0;
+ }
+
+ rdc_ctx = setup_rdc( ); // pull rdc directories from enviornment and initialise
+
+ while( 1 ) {
+ mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
+
+ if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0 ) {
+ fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
+ if( fd >= 0 ) {
+ if( long_hdr ) {
+ build_hdr( mbuf->len, header, sizeof( header ) );
+ hwlen = MCL_EXHDR_SIZE;
+ } else {
+ snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+ hwlen = MCL_LEN_SIZE;
+ }
+
+ if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
+ drops++;
+ total_drops++;
+ chalk_error( fifo );
+ } else {
+ if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
+ errors++;
+ chalk_error( fifo );
+ } else {
+ chalk_ok( fifo );
+ count++;
+ total++;
+ }
+ }
+ }
+
+ if( rdc_ctx != NULL ) {
+ rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
+ rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
+ }
+ }
+
+ if( report ) {
+ if( (now = time( NULL ) ) > next_report ) {
+ rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
+ fflush( stdout );
+
+ logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
+ total, total_drops, report, count, drops, errors );
+ next_report = now + report;
+ count = 0;
+ drops = 0;
+
+ fflush( stdout );
+ }
+ }
+ }
+}
+
+