Reorganise the sidecars
[ric-app/mc.git] / sidecars / listener / mcl.c
diff --git a/sidecars/listener/mcl.c b/sidecars/listener/mcl.c
new file mode 100644 (file)
index 0000000..78908d9
--- /dev/null
@@ -0,0 +1,637 @@
+// vim: ts=4 sw=4 noet:
+/*
+--------------------------------------------------------------------------------
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+--------------------------------------------------------------------------------
+*/
+
+/*
+       Mnemonic:       mcl.c.
+       Abstract:       The mc listener library content. All external functions
+                               should start with mcl_ and all stderr messages should have
+                               (mcl) as the first token following the severity indicator.
+
+       Date:           22 August 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+
+#include <rmr/rmr.h>
+#include <rmr/rmr_symtab.h>
+
+#include "mcl.h"
+
+#define READER 0
+#define WRITER 1
+
+#define TRUE   1
+#define FALSE  0
+
+/*
+       Information about one file descriptor. This is pointed to by the hash
+       such that the message type can be used as a key to look up the fifo's
+       file descriptor.
+*/
+typedef struct {
+       int     fd;                                     // open fdes
+       int key;                                // symtab key
+       long long wcount;               // number of writes
+       long long drops;                // number dropped
+
+       long long wcount_rp;    // number of writes during last reporting period
+       long long drops_rp;             // number dropped during last reporting period
+} fifo_t;
+
+/*
+       Our conext.  Pointers to the read and write hash tables (both keyed on the message
+       type), the message router (RMR) context, and other goodies.
+*/
+typedef struct {
+       void*   mrc;                            // the message router's context
+       void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
+       void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
+       char*   fifo_dir;                       // directory where we open fifos
+
+} mcl_ctx_t;
+
+// -------- private -------------------------------------------------------
+
+
+/*
+       Set up for raw data capture. We look for directory overriedes from
+       environment variables, and then invoke the rdc_init() to actually
+       set things upd.
+*/
+static void* setup_rdc() {
+       void*   ctx;
+       int             value;                                                  // value computed for something
+       char*   ep;                                                             // pointer to environment var
+       char*   sdir = "/tmp/rdc/stage";                // default directory names
+       char*   fdir = "/tmp/rdc/final";
+       char*   suffix = ".rdc";
+       char*   done = NULL;
+
+       if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
+               if( ep != NULL  && atoi( ep ) == 0 ) {
+                       logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
+                       return NULL;
+               }
+       }
+
+       if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
+               sdir = ep;
+       } else {
+               mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
+               mkdir( sdir, 0755 );
+       }
+
+       if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
+               fdir = ep;
+       } else {
+               mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
+               mkdir( fdir, 0755 );
+       }
+
+       if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
+               suffix = ep;
+       }
+
+       if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
+               done = ep;
+       }
+
+       ctx = rdc_init( sdir, fdir, suffix, done );
+       if( ctx == NULL ) {
+               logit( LOG_ERR, "rdc_init did not generate a context" );
+       } else {
+               logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
+               logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
+       }
+
+       if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
+               value = atoi( ep );
+               logit( LOG_INFO, "setting frequency: %d", value );
+               rdc_set_freq( ctx, value );     
+       }
+       return ctx;
+}
+
+/*
+       Builds an extended header in the buffer provided, or allocates a new buffer if
+       dest is nil. The header is of the form:
+               <delim><len><timestamp>
+
+       Timestamp is a single unsigned long long in ASCII; ms since epoch.
+       If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
+       the timestamp generated will be 1570113591103.
+*/
+static char* build_hdr( int len, char* dest, int dest_len ) {
+       struct timespec ts;         // time just before call executed
+
+       if( dest == NULL ) {
+               dest_len = 48;
+               dest = (char *) malloc( sizeof( char ) * dest_len );
+       } else {
+               if( dest_len < 28 ) {           // shouldn't happen, but take no chances
+                       memset( dest, 0, dest_len );
+                       return NULL;
+               }
+       }
+
+       memset( dest, 0, dest_len );
+
+       clock_gettime( CLOCK_REALTIME, &ts );
+       sprintf( dest, "%s%07d", MCL_DELIM, len );
+       sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
+
+       return dest;
+}
+
+/*
+       Build a file name and open. The io_direction is either READER or
+       WRITER.  For a writer we must 'trick' the system into allowing us
+       to open a pipe for writing in non-blocking mode so that we can
+       report on drops (messages we couldn't write because there was no
+       reader).  The trick is to open a reader on the pipe so that when
+       we open the writer there's a reader and the open won't fail. As
+       soon as we have the writer open, we can close the junk reader.
+
+       If the desired fifo does not exist, it is created.
+*/
+static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
+       char    wbuf[1024];
+       int             fd;                             // real file des
+       int             jfd = -1;                       // junk file des
+       int             state;
+
+       if( ctx == NULL || mtype < 0 ) {
+               return -1;
+       }
+
+       snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
+
+       state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
+       if( state != 0 && errno != EEXIST ) {
+               logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
+               return -1;
+       }
+
+       if( io_dir == READER ) {
+               fd = open( wbuf, O_RDONLY  );                   // just open the reader
+               if( fd < 0 ) {
+                       logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
+               }
+       } else {
+               jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
+               if( jfd < 0 ) {
+                       logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
+               }
+
+               fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
+               if( fd < 0 ) {
+                       logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
+               }
+
+               close( jfd );                   // should be safe to close this
+       }
+
+
+       return fd;
+}
+
+/*
+       Given a message type, return the file des of the fifo that
+       the payload should be written to.        Returns the file des, or -1
+       on error. When sussing out a read file descriptor this will
+       block until there is a fifo for the message type which is
+       open for reading.
+
+       If fref is not nil, then a pointer to the fifo info block is returned
+       allowing for direct update of counts after the write.
+*/
+static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
+       fifo_t* fifo;
+       void*   hash;
+
+       if( io_dir == READER ) {                // with an integer key, we nned two hash tables
+               hash = ctx->rd_hash;
+       } else {
+               hash = ctx->wr_hash;
+       }
+
+       if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
+               fifo = (fifo_t *) malloc( sizeof( *fifo ) );
+               if( fifo == NULL ) {
+                       return -1;
+               }
+
+               memset( fifo, 0, sizeof( *fifo ) );
+               fifo->key = mtype;
+               fifo->fd = open_fifo( ctx, mtype, io_dir );
+               rmr_sym_map( hash, mtype, fifo );
+       }
+
+       if( fref != NULL ) {
+               *fref = fifo;
+       }
+       return fifo->fd;
+}
+
+/*
+       Make marking counts easier in code
+*/
+static inline void chalk_error( fifo_t* fifo ) {
+       if( fifo != NULL ) {
+               fifo->drops++;
+               fifo->drops_rp++;
+       }
+}
+
+static inline void chalk_ok( fifo_t* fifo ) {
+       if( fifo != NULL ) {
+               fifo->wcount++;
+               fifo->wcount_rp++;
+       }
+}
+
+/*
+       Callback function driven to traverse the symtab and generate the
+       counts for each fifo.
+*/
+static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
+       fifo_t* fifo;
+       int             report_period = 60;
+
+       if( data ) {
+               report_period = *((int *) data);
+       }
+
+       if( (fifo = (fifo_t *) thing) != NULL ) {
+               logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
+                       fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
+
+               fifo->wcount_rp = 0;            // reset the report counts
+               fifo->drops_rp = 0;
+       }
+}
+
+// ---------- public ------------------------------------------------------
+/*
+       Sets a signal handler for sigpipe so we don't crash if a reader closes the
+       last reading fd on a pipe. We could do this automatically, but if the user
+       programme needs to trap sigpipe too, this gives them the option not to have
+       us interfere.
+*/
+extern int mcl_set_sigh( ) {
+       signal( SIGPIPE, SIG_IGN );
+}
+
+/*
+       "Opens" the interface to RMR such that messages sent to the application will
+       be available via the rmr receive funcitons. This is NOT automatically called
+       by the mk_context function as some applications will be using the mc library
+       for non-RMR, fifo, chores.
+*/
+extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
+       mcl_ctx_t*      ctx;
+       int             announce = 0;
+
+       if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
+               return 0;
+       }
+
+       ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
+       if( ctx->mrc == NULL ) {
+               logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
+               return 0;
+       }
+
+       while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
+               if( announce <= 0 ) {
+                       logit(  LOG_INFO, "waiting for RMR to show ready" );
+                       announce = 10;
+               } else {
+                       announce--;
+               }
+
+               sleep( 1 );
+       }
+
+       return 1;
+}
+
+/*
+       Blocks until a message arives with a good return code or we timeout. Returns the
+       rmr message buffer. Timeout value epxected in seconds.
+*/
+extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
+       mcl_ctx_t*      ctx;
+
+       if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       if( ctx->mrc == NULL ) {
+               logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
+               exit( 1 );
+       }
+
+       do {
+               msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
+       } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
+
+       return msg;
+}
+
+/*
+       Create the context.
+*/
+extern void* mcl_mk_context( char* dir ) {
+       mcl_ctx_t*      ctx;
+
+       if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
+               memset( ctx, 0, sizeof( *ctx ) );
+               ctx->fifo_dir = strdup( dir );
+               ctx->wr_hash = rmr_sym_alloc( 1001 );
+               ctx->rd_hash = rmr_sym_alloc( 1001 );
+
+               if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
+                       logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
+                       free( ctx );
+                       return NULL;
+               }
+       }
+
+       return (void *) ctx;
+}
+
+/*
+       Read the header. Best case we read the expected number of bytes, get all
+       of them and find that they start with the delemiter.  Worst case
+       We have to wait for all of the header, or need to synch at the next
+       delimeter. We assume best case most likely and handle it as such.
+*/
+static void read_header( int fd, char* buf ) {
+       int len;
+       int need = MCL_EXHDR_SIZE;              // total needed
+       int dneed;                                              // delimieter needed
+       int     rlen;
+       char*   rp;                             // read position in buf
+
+       len = read( fd, buf, need );
+       if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
+               return;
+       }
+
+       while( TRUE ) {
+               if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
+                       rp = buf + len;
+                       dneed = strlen( MCL_DELIM ) - len;
+
+                       while( dneed > 0 ) {
+                               len = read( fd, rp, dneed );
+                               dneed -= len;
+                               rp += len;
+                       }
+               }
+
+               if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
+                       need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
+                       rp = buf + (MCL_EXHDR_SIZE - need);
+
+                       while( need > 0 ) {
+                               len = read( fd, rp, need );
+                               need -= len;
+                               rp += len;
+                       }
+
+                       return;
+               }
+
+               while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
+                       len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
+               }
+
+               need = MCL_EXHDR_SIZE - len;
+       }
+}
+
+
+/*
+       Read one record from the fifo that the message type maps to.
+       Writes at max ublen bytes into the ubuf.
+
+       If long_hdrs is true (!0), then we expect that the stream in the fifo
+       has extended headers (<delim><len><time>), and will write the timestamp
+       from the header into the buffer pointed to by timestamp. The buffer is
+       assumed to be at least MCL_TSTAMP_SIZE bytes in length.
+
+       Further, when extended headers are being used, this function will
+       automatically resynchronise if it detects an issue.
+
+       The function could look for the delimiter and automatically detect whether
+       or not extended headers are in use, but if the stream is out of synch on the
+       first read, this cannot be done, so the funciton requires that the caller
+       know that the FIFO contains extended headers.
+*/
+static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
+       int fd;
+       int len;
+       int     msg_len;
+       int     got = 0;                                                // number of bytes we actually got
+       int need;
+       char wbuf[4096];
+       mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
+       fifo_t* fref = NULL;                            // the fifo struct we found
+
+       if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
+               if( long_hdrs ) {
+                       read_header( fd, wbuf );
+                       msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
+                       if( timestamp ) {
+                               strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
+                       }
+               } else {
+                       if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
+                               *timestamp = 0;
+                       }
+
+                       len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
+                       msg_len = need = atoi( wbuf );
+               }
+
+
+               if( need > ublen ) {
+                       need = ublen;                                           // cannot give them more than they can take
+               }
+               while( need > 0 ) {
+                       len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
+                       memcpy( ubuf+got, wbuf, len );
+                       got += len;
+                       need -= len;
+               }
+
+               if( msg_len > got ) {                                   // we must ditch rest of this message
+                       need = msg_len = got;
+                       while( need > 0 ) {
+                               len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
+                               need -= len;
+                       }
+               }
+
+               return got;
+       }
+
+       errno = EBADFD;
+       return 0;
+}
+
+/*
+       Read one record from the fifo that the message type maps to.
+       Writes at max ublen bytes into the ubuf. If extended headers are in use
+       this function will ignore the timestamp.
+
+       If long_hdrs is true (!0), then we expect that the stream in the fifo
+       has extended headers (<delim><len><time>).
+*/
+extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
+       return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
+}
+
+/*
+       Read a single message from the FIFO returning it in the caller's buffer. If extended
+       headers are being used, and the caller supplied a timestamp buffer, the timestamp
+       which was in the header will be returned in that buffer.  The return value is the number
+       of bytes in the buffer; 0 indicates an error and errno should be set.
+*/
+extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
+       return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
+}
+
+
+/*
+       Will read messages and fan them out based on the message type. This should not
+       return and if it does the caller should assume an error.
+
+       The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
+       string , followed by that number of 'raw' bytes. The raw bytes are the payload
+       exactly as received.
+
+       The report parameter is the frequency, in seconds, for writing a short
+       status report to stdout. If 0 then it's off.
+
+       If long_hdr is true, then we geneate an extended header with a delimiter and
+       timestamp.
+*/
+extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
+       mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
+       fifo_t*         fifo;                                   // fifo to chalk counts on
+       rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
+       char            header[128];                    // header we'll pop in front of the payload
+       int                     state;
+       int                     fd;                                             // file des to write to
+       long long       total = 0;                              // total messages received and written
+       long long       total_drops = 0;                // total messages received and written
+       long            count = 0;                              // messages received and written during last reporting period
+       long            errors = 0;                             // unsuccessful payload writes
+       long            drops;                                  // number of drops
+       time_t          next_report = 0;                // we'll report every 2 seconds if report is true
+       time_t          now;
+       int                     hwlen;                                  // write len for header
+       void*           rdc_ctx = NULL;                 // raw data capture context
+       void*           rdc_buf = NULL;                 // capture buffer
+
+       if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
+               logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
+               errno = EINVAL;
+               return;
+       }
+
+       if( report < 0 ) {
+               report = 0;
+       }
+
+       rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
+
+       while( 1 ) {
+               mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
+
+               if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0  ) {
+                       fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
+                       if( fd >= 0 ) {
+                               if( long_hdr ) {
+                                       build_hdr( mbuf->len, header, sizeof( header ) );
+                                       hwlen = MCL_EXHDR_SIZE;
+                               } else {
+                                       snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+                                       hwlen = MCL_LEN_SIZE;
+                               }
+
+                               if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
+                                       drops++;
+                                       total_drops++;
+                                       chalk_error( fifo );
+                               } else {
+                                       if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
+                                               errors++;
+                                               chalk_error( fifo );
+                                       } else {
+                                               chalk_ok( fifo );
+                                               count++;
+                                               total++;
+                                       }
+                               }
+                       }
+
+                       if( rdc_ctx != NULL ) {
+                               rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
+                               rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
+                       }
+               }
+
+               if( report ) {
+                       if( (now = time( NULL ) ) > next_report ) {
+                       rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
+                               fflush( stdout );
+
+                               logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
+                                       total, total_drops, report, count, drops, errors );
+                               next_report = now + report;
+                               count = 0;
+                               drops = 0;
+
+                               fflush( stdout );
+                       }
+               }
+       }
+}
+
+