Add ability to capture raw messages
[ric-app/mc.git] / src / sidecars / listener / mcl.c
index 06ceb82..78908d9 100644 (file)
@@ -22,7 +22,7 @@
        Abstract:       The mc listener library content. All external functions
                                should start with mcl_ and all stderr messages should have
                                (mcl) as the first token following the severity indicator.
-                               
+
        Date:           22 August 2019
        Author:         E. Scott Daniels
 */
@@ -36,6 +36,8 @@
 #include <fcntl.h>
 #include <signal.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+
 
 #include <rmr/rmr.h>
 #include <rmr/rmr_symtab.h>
@@ -48,7 +50,6 @@
 #define TRUE   1
 #define FALSE  0
 
-
 /*
        Information about one file descriptor. This is pointed to by the hash
        such that the message type can be used as a key to look up the fifo's
@@ -73,12 +74,71 @@ typedef struct {
        void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
        void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
        char*   fifo_dir;                       // directory where we open fifos
-       
+
 } mcl_ctx_t;
 
 // -------- private -------------------------------------------------------
 
 
+/*
+       Set up for raw data capture. We look for directory overriedes from
+       environment variables, and then invoke the rdc_init() to actually
+       set things upd.
+*/
+static void* setup_rdc() {
+       void*   ctx;
+       int             value;                                                  // value computed for something
+       char*   ep;                                                             // pointer to environment var
+       char*   sdir = "/tmp/rdc/stage";                // default directory names
+       char*   fdir = "/tmp/rdc/final";
+       char*   suffix = ".rdc";
+       char*   done = NULL;
+
+       if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
+               if( ep != NULL  && atoi( ep ) == 0 ) {
+                       logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
+                       return NULL;
+               }
+       }
+
+       if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
+               sdir = ep;
+       } else {
+               mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
+               mkdir( sdir, 0755 );
+       }
+
+       if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
+               fdir = ep;
+       } else {
+               mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
+               mkdir( fdir, 0755 );
+       }
+
+       if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
+               suffix = ep;
+       }
+
+       if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
+               done = ep;
+       }
+
+       ctx = rdc_init( sdir, fdir, suffix, done );
+       if( ctx == NULL ) {
+               logit( LOG_ERR, "rdc_init did not generate a context" );
+       } else {
+               logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
+               logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
+       }
+
+       if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
+               value = atoi( ep );
+               logit( LOG_INFO, "setting frequency: %d", value );
+               rdc_set_freq( ctx, value );     
+       }
+       return ctx;
+}
+
 /*
        Builds an extended header in the buffer provided, or allocates a new buffer if
        dest is nil. The header is of the form:
@@ -110,10 +170,10 @@ static char* build_hdr( int len, char* dest, int dest_len ) {
        return dest;
 }
 
-/*     
+/*
        Build a file name and open. The io_direction is either READER or
        WRITER.  For a writer we must 'trick' the system into allowing us
-       to open a pipe for writing in non-blocking mode so that we can 
+       to open a pipe for writing in non-blocking mode so that we can
        report on drops (messages we couldn't write because there was no
        reader).  The trick is to open a reader on the pipe so that when
        we open the writer there's a reader and the open won't fail. As
@@ -132,27 +192,27 @@ static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
        }
 
        snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
-       
+
        state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
        if( state != 0 && errno != EEXIST ) {
-               fprintf( stderr, "[ERR] (mcl) unable to create fifo: %s: %s\n", wbuf, strerror( errno ) );
+               logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
                return -1;
        }
 
        if( io_dir == READER ) {
                fd = open( wbuf, O_RDONLY  );                   // just open the reader
                if( fd < 0 ) {
-                       fprintf( stderr, "[ERR] (mcl) fifo open failed (ro): %s: %s\n", wbuf, strerror( errno ) );
+                       logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
                }
        } else {
                jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
                if( jfd < 0 ) {
-                       fprintf( stderr, "[ERR] (mcl) fifo open failed (rw): %s: %s\n", wbuf, strerror( errno ) );
+                       logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
                }
-       
+
                fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
                if( fd < 0 ) {
-                       fprintf( stderr, "[ERR] (mcl) fifo open failed (wo): %s: %s\n", wbuf, strerror( errno ) );
+                       logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
                }
 
                close( jfd );                   // should be safe to close this
@@ -218,7 +278,7 @@ static inline void chalk_ok( fifo_t* fifo ) {
 }
 
 /*
-       Callback function driven to traverse the symtab and generate the 
+       Callback function driven to traverse the symtab and generate the
        counts for each fifo.
 */
 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
@@ -230,7 +290,7 @@ static void wr_stats( void* st, void* entry, char const* name, void* thing, void
        }
 
        if( (fifo = (fifo_t *) thing) != NULL ) {
-               fprintf( stdout, "[STAT] (mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld\n", 
+               logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
                        fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
 
                fifo->wcount_rp = 0;            // reset the report counts
@@ -265,13 +325,13 @@ extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
 
        ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
        if( ctx->mrc == NULL ) {
-               fprintf( stderr, "[CRIT]  unable to initialise RMr\n" );
+               logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
                return 0;
        }
 
        while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
                if( announce <= 0 ) {
-                       fprintf( stderr, "[INFO] waiting for RMR to show ready\n" );
+                       logit(  LOG_INFO, "waiting for RMR to show ready" );
                        announce = 10;
                } else {
                        announce--;
@@ -284,7 +344,7 @@ extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
 }
 
 /*
-       Blocks until a message arives with a good return code or we timeout. Returns the 
+       Blocks until a message arives with a good return code or we timeout. Returns the
        rmr message buffer. Timeout value epxected in seconds.
 */
 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
@@ -295,7 +355,7 @@ extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
        }
 
        if( ctx->mrc == NULL ) {
-               fprintf( stderr, "bad context\n" );
+               logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
                exit( 1 );
        }
 
@@ -319,7 +379,7 @@ extern      void* mcl_mk_context( char* dir ) {
                ctx->rd_hash = rmr_sym_alloc( 1001 );
 
                if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
-                       fprintf( stderr, "[ERR] (mcl) unable to allocate hash table for fifo keys\n" );
+                       logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
                        free( ctx );
                        return NULL;
                }
@@ -345,12 +405,12 @@ static void read_header( int fd, char* buf ) {
        if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
                return;
        }
-       
+
        while( TRUE ) {
                if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
                        rp = buf + len;
                        dneed = strlen( MCL_DELIM ) - len;
-       
+
                        while( dneed > 0 ) {
                                len = read( fd, rp, dneed );
                                dneed -= len;
@@ -361,7 +421,7 @@ static void read_header( int fd, char* buf ) {
                if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
                        need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
                        rp = buf + (MCL_EXHDR_SIZE - need);
-       
+
                        while( need > 0 ) {
                                len = read( fd, rp, need );
                                need -= len;
@@ -395,7 +455,7 @@ static void read_header( int fd, char* buf ) {
        The function could look for the delimiter and automatically detect whether
        or not extended headers are in use, but if the stream is out of synch on the
        first read, this cannot be done, so the funciton requires that the caller
-       know that the FIFO contains extended headers.  
+       know that the FIFO contains extended headers.
 */
 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
        int fd;
@@ -428,12 +488,12 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                        msg_len = need = atoi( wbuf );
                }
 
-               
+
                if( need > ublen ) {
                        need = ublen;                                           // cannot give them more than they can take
                }
                while( need > 0 ) {
-                       len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );          
+                       len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
                        memcpy( ubuf+got, wbuf, len );
                        got += len;
                        need -= len;
@@ -442,7 +502,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                if( msg_len > got ) {                                   // we must ditch rest of this message
                        need = msg_len = got;
                        while( need > 0 ) {
-                               len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );          
+                               len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
                                need -= len;
                        }
                }
@@ -470,7 +530,7 @@ extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int lon
        Read a single message from the FIFO returning it in the caller's buffer. If extended
        headers are being used, and the caller supplied a timestamp buffer, the timestamp
        which was in the header will be returned in that buffer.  The return value is the number
-       of bytes in the buffer; 0 indicates an error and errno should be set.   
+       of bytes in the buffer; 0 indicates an error and errno should be set.
 */
 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
        return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
@@ -479,10 +539,10 @@ extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int l
 
 /*
        Will read messages and fan them out based on the message type. This should not
-       return and if it does the caller should assume an error. 
+       return and if it does the caller should assume an error.
 
        The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
-       string , followed by that number of 'raw' bytes. The raw bytes are the payload 
+       string , followed by that number of 'raw' bytes. The raw bytes are the payload
        exactly as received.
 
        The report parameter is the frequency, in seconds, for writing a short
@@ -495,7 +555,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
        mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
        fifo_t*         fifo;                                   // fifo to chalk counts on
        rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
-       char            wbuf[128];                              // buffer to build len string in
+       char            header[128];                    // header we'll pop in front of the payload
        int                     state;
        int                     fd;                                             // file des to write to
        long long       total = 0;                              // total messages received and written
@@ -506,9 +566,11 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
        time_t          next_report = 0;                // we'll report every 2 seconds if report is true
        time_t          now;
        int                     hwlen;                                  // write len for header
+       void*           rdc_ctx = NULL;                 // raw data capture context
+       void*           rdc_buf = NULL;                 // capture buffer
 
        if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
-               fprintf( stderr, "[ERR] (mcl) invalid context given to fanout\n" );
+               logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
                errno = EINVAL;
                return;
        }
@@ -517,6 +579,8 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                report = 0;
        }
 
+       rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
+
        while( 1 ) {
                mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
 
@@ -524,19 +588,19 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                        fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
                        if( fd >= 0 ) {
                                if( long_hdr ) {
-                                       build_hdr( mbuf->len, wbuf, sizeof( wbuf ) );
+                                       build_hdr( mbuf->len, header, sizeof( header ) );
                                        hwlen = MCL_EXHDR_SIZE;
                                } else {
-                                       snprintf( wbuf, sizeof( wbuf ), "%07d", mbuf->len );                    // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+                                       snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
                                        hwlen = MCL_LEN_SIZE;
                                }
 
-                               if( (state = write( fd, wbuf, hwlen )) != hwlen ) {             // write exactly MCL_LEN_SIZE bytes from the buffer
+                               if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
                                        drops++;
                                        total_drops++;
                                        chalk_error( fifo );
                                } else {
-                                       if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload      
+                                       if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
                                                errors++;
                                                chalk_error( fifo );
                                        } else {
@@ -546,6 +610,11 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                                        }
                                }
                        }
+
+                       if( rdc_ctx != NULL ) {
+                               rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
+                               rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
+                       }
                }
 
                if( report ) {
@@ -553,7 +622,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                        rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
                                fflush( stdout );
 
-                               fprintf( stdout, "[STAT] (mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors\n", 
+                               logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
                                        total, total_drops, report, count, drops, errors );
                                next_report = now + report;
                                count = 0;