Add ability to capture raw messages 92/1092/3
authorE. Scott Daniels <daniels@research.att.com>
Mon, 7 Oct 2019 19:39:59 +0000 (15:39 -0400)
committerE. Scott Daniels <daniels@research.att.com>
Wed, 16 Oct 2019 13:46:38 +0000 (09:46 -0400)
The listener will now capture raw messages and write them to
files which we assume a collection agent will gather and
send away to be saved.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I172c34976cf59853f0fb0dd7e0d7912197fdb0de

Add copy to support xdev directories

If the rename of a raw capture file fails with because
src and target are not on the same filesystem, then we
will copy the file.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I378dae46011c5480aecde62d484c98377f39026a

src/sidecars/listener/.gitignore
src/sidecars/listener/Makefile
src/sidecars/listener/README
src/sidecars/listener/TESTING [new file with mode: 0644]
src/sidecars/listener/build_images.sh
src/sidecars/listener/mc_listener.c
src/sidecars/listener/mcl.c
src/sidecars/listener/mcl.h
src/sidecars/listener/rdc.c [new file with mode: 0644]
src/sidecars/listener/verify.sh

index 36f7113..1b2fc2a 100644 (file)
@@ -22,7 +22,7 @@
 
 binaries = mc_listener
 test_progs = sender unit_test pipe_reader
-lib_obj = mcl.o
+lib_obj = mcl.o rdc.o
 lib_h = mcl.h
 
 coverage_opts = -ftest-coverage -fprofile-arcs
@@ -30,8 +30,8 @@ coverage_opts = -ftest-coverage -fprofile-arcs
 # make with no parms should just build 'production' binaries
 all: $(binaries) mc_listener
 
-libmcl.a:      $(lib_obj) $(lib_h)
-       ar -v -r libmcl.a $<
+libmcl.a::     $(lib_obj) $(lib_h)
+       ar -v -r libmcl.a $(lib_obj)
 
 mc_listener: mc_listener.c libmcl.a
        gcc mc_listener.c -o mc_listener -L. -lmcl  -lrmr_nng -lnng -lm -lpthread
index 4b2da47..0036969 100644 (file)
@@ -70,8 +70,77 @@ A very small set of unit tests are provided for the library functions in
 mcl.c.  Because of the nature of the fanout function, which blocks waiting
 on RMR messages, it is not possible to unit test that bit of code.
 
+
+Data Capture
+In addition to writing received messages to a FIFO, each message is 
+captured in a raw data file. These files are created in a staging
+directory (/tmp/rdc/stage by default) and moved to a final directory
+(/tmp/rdc/final by default) every 5 minutes (300 seconds).  The files
+are named MCLT[src]_<timestamp><.suffix>; the default suffix is ".rdc".
+The "src" string is optional and can be used to disabmiguate files
+if merged with capture files from different environments.
+
+The data capture is controlled by setting environment variables:
+
+ MCL_RDC_ENABLE: If set to 0 the raw data capture will be disabled.
+                               (It is on by default if this variable is not defined.)
+
+ MCL_RDC_STAGE: overrides the directory where raw data capture 
+                               files are staged.
+
+ MCL_RDC_FINAL: overrides the directory where raw data capture files 
+                               are placed for export.
+
+ MCL_RDC_SUFFIX: the suffix written on each raw data capture file; 
+                               must include '.' if a dot is desired and is written only on
+                               the final file (not the staged file).
+
+ MCL_RDC_SOURCE: a short string used as source identification in rdc file names.
+                               This defaults to nothing and should begin, but not end,
+                               with an underbar (e.g. _host1).
+
+ MCL_RDC_FREQ: The frequency with which files are closed and moved from the
+                               staging to final directory. The default if 300 sec.
+
+The captured data is saved in the following format:
+       <delim><mtype><len><fifo-buffer>
+
+Where
+       <delim> is a 4 character delimiter for synchronisation (@RDC)
+
+       <mtype> is an 8 character field containing a nil terminated ACII
+               value that is the message type of the received message.
+
+       <len> is an 8 character field containing a nil terminated ASCII
+               value that is the length of the <fifo-buffer>
+
+       <fifo-buffer> is the exact contents that were written to the FIFO.
+
+Thus to "replay" the captured data, a decoder need read just the first 
+20 bytes, convert the message type and length, read the payload, and 
+then write the payload to a FIFO
+
+Staging and Final Directories
+If the staging and final directories (/tmp/rdc/stage and /tmp/rdc/final
+by default) are on the same filsystem, then the rename() system call is
+used to switch the inode reference to the file from one directory to the
+other. This is the preferred setup.  However, if it is not possible for
+these directories to exist on the same filesystem, then the listener will
+copy the file "manually." During the copy, the filename in the final 
+directory will have a leading dot (.) charcter, and the file will be 
+created with a write only by owner mode (0200) and not switched to readable
+until after it is closed; just before renaming it to remove the leading 
+dot. 
+
+Any file capture utillity should either ignore files with leading dot
+characters, or files which do not have a read bit set in their mode. 
+
+
+
 FIFO Reader
 The pipe_reader programme is a simple application which uses the mcl.c 
 library functions to open and read from a single pipe.  If the -e option
 is given it will expect that data in the FIFO has extended headers. Use
 the -? option (or -h) to generate a full usage statement.
+
+
diff --git a/src/sidecars/listener/TESTING b/src/sidecars/listener/TESTING
new file mode 100644 (file)
index 0000000..c264378
--- /dev/null
@@ -0,0 +1,111 @@
+Quick test
+Build the container, and run the test:
+       # the container version number may be different than illustrated here
+       bash build_images.sh
+       docker run -it --rm mc_listener:1.2.1 bash /playpen/bin/verify.sh -l
+
+
+
+Indepth Testing Details
+The verify.sh script can be used to verify that the mc_listener binary
+in the container is working.  The script will create a sending process
+that will send messages to the listener.  It will also create a pipe
+reader process for each FIFO that is expected to be created by the 
+listener.  
+
+The default test
+       bash /playpen/bin/verify.sh
+
+The default is a short, approximately 15 second, test which
+will generate all of the FIFOs, and a single raw data capture file
+in the staging directory.  The test isn't long enough for the raw
+data capture mechanism to attempt to "roll" the capture file.
+
+The long test
+       bash /playpen/bin/verify.sh -l
+
+The long test runs for approximately 150 seconds and sets the roll
+frequency on the capture files to 13 seconds.  This test should
+generate several files in the "final" directory. One, in progress,
+file should be left in the staging directory.
+
+Copy vs Rename
+By default the raw data capture will attempt to rename the file to
+move it from staging to final. However, if these directories are not
+on the same filesystem the rename will fail.  In this case, the rdc
+code will copy the file.  These two mechanisms can be tested by
+supplying the container with an external volume, /data, which contains
+a final directory.  For example:
+
+       mkdir -p /tmp/rdc/final
+       docker run -v /tmp/rdc:/data --rm -it mc_listener:1.2.0 bash /playpen/bin/verify.sh -l
+
+When the verify script sees /data/final it will set that as the directory
+for the finished files rather than using the default which is on the
+same filesystem as the staging directory.  The staging directory can
+be changed to be an external directory by adding ./stage to the volume
+which is mounted. For example
+
+       mkdir -p /tmp/rdc/stage
+
+The verify script will find this and switch away from the default and
+allow a test with both directories to originate from outside of the
+container.
+
+
+Sample output
+The following is sample output from running the long test:
+
+
+>> docker run -v /tmp/foo:/data --rm -it mc_listener:1.2.0 bash /playpen/bin/verify.sh -l
+### found /data/final using that as final directory
+starting listener
+starting pipe reader 0
+starting pipe reader 1
+starting pipe reader 2
+starting pipe reader 3
+starting pipe reader 4
+starting pipe reader 5
+starting pipe reader 6
+starting sender
+stopping listener
+stopping sender
+stopping pipe reader 0
+stopping pipe reader 1
+stopping pipe reader 3
+stopping pipe reader 4
+stopping pipe reader 5
+stopping pipe reader 2
+stopping pipe reader 6
+all functions stopped; looking at logs
+----- logs ---------
+-rw-r--r-- 1 root root  41337 Oct  8 16:24 /tmp/listen.log
+-rw-r--r-- 1 root root      0 Oct  8 16:22 /tmp/pr.0.log
+-rw-r--r-- 1 root root 176128 Oct  8 16:24 /tmp/pr.1.log
+-rw-r--r-- 1 root root 176128 Oct  8 16:24 /tmp/pr.2.log
+-rw-r--r-- 1 root root 176128 Oct  8 16:24 /tmp/pr.3.log
+-rw-r--r-- 1 root root 176128 Oct  8 16:24 /tmp/pr.4.log
+-rw-r--r-- 1 root root 176128 Oct  8 16:24 /tmp/pr.5.log
+-rw-r--r-- 1 root root 176128 Oct  8 16:24 /tmp/pr.6.log
+-rw-r--r-- 1 root root 195232 Oct  8 16:24 /tmp/sender.log
+[OK]    All logs seem good
+[OK]    Found expected fifos
+[OK]    Found staging direcory (/tmp/rdc/stage)
+total 100
+drwxr-xr-x 2 root root  4096 Oct  8 16:24 .
+drwxr-xr-x 3 root root  4096 Oct  8 16:22 ..
+--w------- 1 root root 93628 Oct  8 16:24 MCLT_1570551840
+[OK]    Found final direcory (/data/final)
+total 1040
+drwxrwxr-x 2 1001 1001   4096 Oct  8 16:24 .
+drwxrwxr-x 3 1001 1001   4096 Oct  8 16:22 ..
+-rw-rw-r-- 1 root root 146157 Oct  8 16:22 MCLT_1570551749.rdc
+-rw-rw-r-- 1 root root 149222 Oct  8 16:22 MCLT_1570551762.rdc
+-rw-rw-r-- 1 root root 149199 Oct  8 16:23 MCLT_1570551775.rdc
+-rw-rw-r-- 1 root root 149200 Oct  8 16:23 MCLT_1570551788.rdc
+-rw-rw-r-- 1 root root 149209 Oct  8 16:23 MCLT_1570551801.rdc
+-rw-rw-r-- 1 root root 149191 Oct  8 16:23 MCLT_1570551814.rdc
+-rw-rw-r-- 1 root root 149206 Oct  8 16:24 MCLT_1570551827.rdc
+[OK]   Found 7 files in final directory (/data/final)
+[PASS]
+
index 4fe1219..d32b4f5 100755 (executable)
@@ -40,8 +40,8 @@ then
 fi
 
 
-ver=${1:-1.1}
-patch=${2:-0}
+ver=${1:-1.2}
+patch=${2:-1}
 
 if (( skip_dev == 0 ))
 then
index a5effcd..f9fab09 100644 (file)
@@ -60,6 +60,15 @@ static void bad_arg( char* what ) {
 static void usage( char* argv0 ) {
        fprintf( stderr, "usage: %s [-d fifo-dir] [-e] [-p listen-port] [-q | -r report-freq]\n", argv0 );
        fprintf( stderr, "  -e  disable extended header in buffers written to FIFOs\n" );
+       fprintf( stderr, "\n" );
+       fprintf( stderr, "The following environment variables may be set to affect operation:\n" );
+       fprintf( stderr, "  MCL_RDC_STAGE: the directory where raw data capture files are staged. (/tmp/rdc/stage)\n" );
+       fprintf( stderr, "  MCL_RDC_FINAL: the directory where raw data capture files are placed for export. (/tmp/rdc/final)\n" );
+       fprintf( stderr, "  MCL_RDC_SUFFIX: the suffix written on each raw data capture file; must include '.'. (.rdc)\n" );
+       fprintf( stderr, "  MCL_RDC_SOURCE: a short string used as source identification in rdc file names.\n" );
+       fprintf( stderr, "  MCL_RDC_FREQ: the amount of time (seconds) that raw capture files are rolled. (300)\n" );
+       fprintf( stderr, "\nIf either final or staging directories are defined by environment vars, they MUST exist.\n" );
+       fprintf( stderr, "\n" );
 }
 
 //------------------------------------------------------------------------------------------
index 06ceb82..78908d9 100644 (file)
@@ -22,7 +22,7 @@
        Abstract:       The mc listener library content. All external functions
                                should start with mcl_ and all stderr messages should have
                                (mcl) as the first token following the severity indicator.
-                               
+
        Date:           22 August 2019
        Author:         E. Scott Daniels
 */
@@ -36,6 +36,8 @@
 #include <fcntl.h>
 #include <signal.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+
 
 #include <rmr/rmr.h>
 #include <rmr/rmr_symtab.h>
@@ -48,7 +50,6 @@
 #define TRUE   1
 #define FALSE  0
 
-
 /*
        Information about one file descriptor. This is pointed to by the hash
        such that the message type can be used as a key to look up the fifo's
@@ -73,12 +74,71 @@ typedef struct {
        void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
        void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
        char*   fifo_dir;                       // directory where we open fifos
-       
+
 } mcl_ctx_t;
 
 // -------- private -------------------------------------------------------
 
 
+/*
+       Set up for raw data capture. We look for directory overriedes from
+       environment variables, and then invoke the rdc_init() to actually
+       set things upd.
+*/
+static void* setup_rdc() {
+       void*   ctx;
+       int             value;                                                  // value computed for something
+       char*   ep;                                                             // pointer to environment var
+       char*   sdir = "/tmp/rdc/stage";                // default directory names
+       char*   fdir = "/tmp/rdc/final";
+       char*   suffix = ".rdc";
+       char*   done = NULL;
+
+       if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
+               if( ep != NULL  && atoi( ep ) == 0 ) {
+                       logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
+                       return NULL;
+               }
+       }
+
+       if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
+               sdir = ep;
+       } else {
+               mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
+               mkdir( sdir, 0755 );
+       }
+
+       if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
+               fdir = ep;
+       } else {
+               mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
+               mkdir( fdir, 0755 );
+       }
+
+       if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
+               suffix = ep;
+       }
+
+       if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
+               done = ep;
+       }
+
+       ctx = rdc_init( sdir, fdir, suffix, done );
+       if( ctx == NULL ) {
+               logit( LOG_ERR, "rdc_init did not generate a context" );
+       } else {
+               logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
+               logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
+       }
+
+       if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
+               value = atoi( ep );
+               logit( LOG_INFO, "setting frequency: %d", value );
+               rdc_set_freq( ctx, value );     
+       }
+       return ctx;
+}
+
 /*
        Builds an extended header in the buffer provided, or allocates a new buffer if
        dest is nil. The header is of the form:
@@ -110,10 +170,10 @@ static char* build_hdr( int len, char* dest, int dest_len ) {
        return dest;
 }
 
-/*     
+/*
        Build a file name and open. The io_direction is either READER or
        WRITER.  For a writer we must 'trick' the system into allowing us
-       to open a pipe for writing in non-blocking mode so that we can 
+       to open a pipe for writing in non-blocking mode so that we can
        report on drops (messages we couldn't write because there was no
        reader).  The trick is to open a reader on the pipe so that when
        we open the writer there's a reader and the open won't fail. As
@@ -132,27 +192,27 @@ static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
        }
 
        snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
-       
+
        state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
        if( state != 0 && errno != EEXIST ) {
-               fprintf( stderr, "[ERR] (mcl) unable to create fifo: %s: %s\n", wbuf, strerror( errno ) );
+               logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
                return -1;
        }
 
        if( io_dir == READER ) {
                fd = open( wbuf, O_RDONLY  );                   // just open the reader
                if( fd < 0 ) {
-                       fprintf( stderr, "[ERR] (mcl) fifo open failed (ro): %s: %s\n", wbuf, strerror( errno ) );
+                       logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
                }
        } else {
                jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
                if( jfd < 0 ) {
-                       fprintf( stderr, "[ERR] (mcl) fifo open failed (rw): %s: %s\n", wbuf, strerror( errno ) );
+                       logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
                }
-       
+
                fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
                if( fd < 0 ) {
-                       fprintf( stderr, "[ERR] (mcl) fifo open failed (wo): %s: %s\n", wbuf, strerror( errno ) );
+                       logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
                }
 
                close( jfd );                   // should be safe to close this
@@ -218,7 +278,7 @@ static inline void chalk_ok( fifo_t* fifo ) {
 }
 
 /*
-       Callback function driven to traverse the symtab and generate the 
+       Callback function driven to traverse the symtab and generate the
        counts for each fifo.
 */
 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
@@ -230,7 +290,7 @@ static void wr_stats( void* st, void* entry, char const* name, void* thing, void
        }
 
        if( (fifo = (fifo_t *) thing) != NULL ) {
-               fprintf( stdout, "[STAT] (mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld\n", 
+               logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
                        fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
 
                fifo->wcount_rp = 0;            // reset the report counts
@@ -265,13 +325,13 @@ extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
 
        ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
        if( ctx->mrc == NULL ) {
-               fprintf( stderr, "[CRIT]  unable to initialise RMr\n" );
+               logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
                return 0;
        }
 
        while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
                if( announce <= 0 ) {
-                       fprintf( stderr, "[INFO] waiting for RMR to show ready\n" );
+                       logit(  LOG_INFO, "waiting for RMR to show ready" );
                        announce = 10;
                } else {
                        announce--;
@@ -284,7 +344,7 @@ extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
 }
 
 /*
-       Blocks until a message arives with a good return code or we timeout. Returns the 
+       Blocks until a message arives with a good return code or we timeout. Returns the
        rmr message buffer. Timeout value epxected in seconds.
 */
 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
@@ -295,7 +355,7 @@ extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
        }
 
        if( ctx->mrc == NULL ) {
-               fprintf( stderr, "bad context\n" );
+               logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
                exit( 1 );
        }
 
@@ -319,7 +379,7 @@ extern      void* mcl_mk_context( char* dir ) {
                ctx->rd_hash = rmr_sym_alloc( 1001 );
 
                if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
-                       fprintf( stderr, "[ERR] (mcl) unable to allocate hash table for fifo keys\n" );
+                       logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
                        free( ctx );
                        return NULL;
                }
@@ -345,12 +405,12 @@ static void read_header( int fd, char* buf ) {
        if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
                return;
        }
-       
+
        while( TRUE ) {
                if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
                        rp = buf + len;
                        dneed = strlen( MCL_DELIM ) - len;
-       
+
                        while( dneed > 0 ) {
                                len = read( fd, rp, dneed );
                                dneed -= len;
@@ -361,7 +421,7 @@ static void read_header( int fd, char* buf ) {
                if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
                        need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
                        rp = buf + (MCL_EXHDR_SIZE - need);
-       
+
                        while( need > 0 ) {
                                len = read( fd, rp, need );
                                need -= len;
@@ -395,7 +455,7 @@ static void read_header( int fd, char* buf ) {
        The function could look for the delimiter and automatically detect whether
        or not extended headers are in use, but if the stream is out of synch on the
        first read, this cannot be done, so the funciton requires that the caller
-       know that the FIFO contains extended headers.  
+       know that the FIFO contains extended headers.
 */
 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
        int fd;
@@ -428,12 +488,12 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                        msg_len = need = atoi( wbuf );
                }
 
-               
+
                if( need > ublen ) {
                        need = ublen;                                           // cannot give them more than they can take
                }
                while( need > 0 ) {
-                       len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );          
+                       len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
                        memcpy( ubuf+got, wbuf, len );
                        got += len;
                        need -= len;
@@ -442,7 +502,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                if( msg_len > got ) {                                   // we must ditch rest of this message
                        need = msg_len = got;
                        while( need > 0 ) {
-                               len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );          
+                               len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
                                need -= len;
                        }
                }
@@ -470,7 +530,7 @@ extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int lon
        Read a single message from the FIFO returning it in the caller's buffer. If extended
        headers are being used, and the caller supplied a timestamp buffer, the timestamp
        which was in the header will be returned in that buffer.  The return value is the number
-       of bytes in the buffer; 0 indicates an error and errno should be set.   
+       of bytes in the buffer; 0 indicates an error and errno should be set.
 */
 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
        return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
@@ -479,10 +539,10 @@ extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int l
 
 /*
        Will read messages and fan them out based on the message type. This should not
-       return and if it does the caller should assume an error. 
+       return and if it does the caller should assume an error.
 
        The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
-       string , followed by that number of 'raw' bytes. The raw bytes are the payload 
+       string , followed by that number of 'raw' bytes. The raw bytes are the payload
        exactly as received.
 
        The report parameter is the frequency, in seconds, for writing a short
@@ -495,7 +555,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
        mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
        fifo_t*         fifo;                                   // fifo to chalk counts on
        rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
-       char            wbuf[128];                              // buffer to build len string in
+       char            header[128];                    // header we'll pop in front of the payload
        int                     state;
        int                     fd;                                             // file des to write to
        long long       total = 0;                              // total messages received and written
@@ -506,9 +566,11 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
        time_t          next_report = 0;                // we'll report every 2 seconds if report is true
        time_t          now;
        int                     hwlen;                                  // write len for header
+       void*           rdc_ctx = NULL;                 // raw data capture context
+       void*           rdc_buf = NULL;                 // capture buffer
 
        if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
-               fprintf( stderr, "[ERR] (mcl) invalid context given to fanout\n" );
+               logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
                errno = EINVAL;
                return;
        }
@@ -517,6 +579,8 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                report = 0;
        }
 
+       rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
+
        while( 1 ) {
                mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
 
@@ -524,19 +588,19 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                        fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
                        if( fd >= 0 ) {
                                if( long_hdr ) {
-                                       build_hdr( mbuf->len, wbuf, sizeof( wbuf ) );
+                                       build_hdr( mbuf->len, header, sizeof( header ) );
                                        hwlen = MCL_EXHDR_SIZE;
                                } else {
-                                       snprintf( wbuf, sizeof( wbuf ), "%07d", mbuf->len );                    // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+                                       snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
                                        hwlen = MCL_LEN_SIZE;
                                }
 
-                               if( (state = write( fd, wbuf, hwlen )) != hwlen ) {             // write exactly MCL_LEN_SIZE bytes from the buffer
+                               if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
                                        drops++;
                                        total_drops++;
                                        chalk_error( fifo );
                                } else {
-                                       if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload      
+                                       if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
                                                errors++;
                                                chalk_error( fifo );
                                        } else {
@@ -546,6 +610,11 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                                        }
                                }
                        }
+
+                       if( rdc_ctx != NULL ) {
+                               rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
+                               rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
+                       }
                }
 
                if( report ) {
@@ -553,7 +622,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
                        rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
                                fflush( stdout );
 
-                               fprintf( stdout, "[STAT] (mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors\n", 
+                               logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
                                        total, total_drops, report, count, drops, errors );
                                next_report = now + report;
                                count = 0;
index 249e851..37f13f0 100644 (file)
@@ -30,7 +30,7 @@
 #include <rmr/rmr.h>
 #include <rmr/rmr_symtab.h>
 
-// ------- public constants and structs -------------------------------------------------
+// ------- public constants and structs -------------------------------------------------------------------
 
 #define MCL_LEN_SIZE 8         // number of bytes the length has in both short and extended header
 #define MCL_DELIM_SIZE 4       // number of bytes in extended header delimiter
 #define MCL_NOWAIT     0       // do not wait for RMR route table to arrive
 #define MCL_WAIT       1       // block reader start until RMR route table is initialised
 
+// ---- rdc definitions ----------------------------------------------------------------------------------
+#define LOG_CRIT       0
+#define LOG_ERR                1
+#define LOG_WARN       2
+#define LOG_INFO       3
+#define LOG_STAT       4               // stats messages go to stdout
+
+
 //------------ prototypes --------------------------------------------------------------
 extern void mcl_fifo_fanout( void* ctx, int report, int long_hdrs );
 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout );
@@ -54,4 +62,13 @@ extern int mcl_fifo_tsread1( void* vctx, int mtype, char* ubuf, int ublen, int l
 extern int mcl_set_sigh( );
 extern int mcl_start_listening( void* vctx, char* port, int wait4ready );
 
+// ---- these can be used by external programmes, but it liekely doesn't make sense to do so ----
+extern void logit( int level, char* fmt, ... );
+extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix );
+extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* capture_buf );
+extern void rdc_close( void* rdl_ctx );
+extern void rdc_set_freq( void* rdl_ctx, int freq );
+extern int rdc_write( void* rdl_ctx, void* rdc_buffer, char* payload, int len );
+
+
 #endif
diff --git a/src/sidecars/listener/rdc.c b/src/sidecars/listener/rdc.c
new file mode 100644 (file)
index 0000000..bf7300e
--- /dev/null
@@ -0,0 +1,504 @@
+// vim: ts=4 sw=4 noet:
+/*
+--------------------------------------------------------------------------------
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+--------------------------------------------------------------------------------
+*/
+
+/*
+       Mnemonic:       rdc.c.
+       Abstract:       This module contains library functions which implement the
+                               raw data collection (rdc) portion of the listener. The fanout
+                               function in the library will call these functions to capture
+                               raw messages, with a header, for later "replay" or other
+                               analysis.  Messages are captured as they are written to the
+                               FIFO, with a small header added:
+                                       @RDC<mtype><len>
+
+                               Where <mtype> is the message type of the message received and
+                               <len> is the length of the data that was written to the FIFO.
+
+                               
+       Date:           06 Oct 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <stdarg.h>
+#include <errno.h> 
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+#include <sys/stat.h>
+
+#include "mcl.h"
+
+/*
+       A capture buffer. The listener writes FIFO output in two stages, thus
+       we provide the ability to initialise a capture with the msg type and 
+       the MRL header, then to write the payload using this saved data. The
+       idea is to not require the caller to save the header.
+*/
+typedef struct {
+       char    uheader[100];   // user header (max 100 bytes)
+       int             uhlen;                  // length of user header
+       int             mtype;                  // message type
+} cap_buf_t;
+
+typedef struct {
+       int             flags;                  // DFFL_* constatnts
+       int             frequency;              // the frequency at which files are rolled
+       int             fd;                             // current open write file
+       char*   sdir;                   // staging directory
+       char*   fdir;                   // final directory
+       char*   suffix;                 // suffix for final file (must include . if needed)
+       char*   dsuffix;                // suffix for done file
+       char*   basename;               // base name of the file being written to
+       char*   openname;               // full filename that is open for writing
+       char*   source;                 // added to output file names to differentiate the source
+       time_t  next_roll;              // time we should roll the file
+} rdc_ctx_t;
+
+#define RDC_DELIM      "@RDC"          // delimeter used in our file
+
+// -------------------------------------------------------------------------------------------
+
+/*
+       Copy and unlink old file is successful. During writing the file mode will
+       be write only for the owner (0200). If the mode passed in is not 0, then
+       just prior to renaming the file to 'new', the mode will be changed. If
+       mode is 0, then we assume the caller will change the file mode when 
+       appropriate.
+
+       There seems to be an issue with some collectors and thus it is required 
+       to initially name the file with a leading dot (.) until the file is closed
+       and ready to be read by external processes (marking it write only seems
+       not to discourage them from trying!).
+*/
+static int copy_unlink( char* old, char* new, int mode ) {
+       char    buf[8192];      // read buffer
+       char*   tfname;         // temp file name while we have it open
+       char*   wbuf;           // work buffer for disecting the new filename
+       char*   tok;            // token pointer into a buffer
+       int     len;
+       int     rfd;            // read/write file descriptors
+       int     wfd;
+       int start;
+       int     state;
+       int remain;             // number of bytes remaining to write
+
+       
+       errno = 0;
+       if( (rfd = open( old, O_RDONLY )) < 0 ) {
+               logit( LOG_ERR, "copy: open src for copy failed: %s: %s\n", old, strerror( errno ) );
+               return -1;
+       }
+
+       len = sizeof( char ) * (strlen( new ) + 2 );                    // space needed for temp file name with added .
+       tfname = (char *) malloc( len );
+       wbuf = strdup( new );                                                                                                   // we need to trash the string, so copy
+       tok = strrchr( wbuf, '/' );                                                                                             // find end of path
+       if( tok ) {
+               *tok = 0;
+               tok++;
+               snprintf( tfname, len, "%s/.%s", wbuf, tok );                                                   // insert . to "hide" from collector
+       } else {
+               snprintf( tfname, len, ".%s", wbuf );                                                                   // no path, just add leading .
+       }
+       //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
+
+       if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
+               logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s\n", tfname, strerror( errno ) );
+               return -1;
+       }
+
+       while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
+               remain = len;
+               start = 0;
+               while( remain > 0 ) {
+                       errno = 0;
+                       if( (len = write( wfd, &buf[start], len )) != remain ) {                // short write
+                               if( errno != EINTR && errno != EAGAIN ) {
+                                       logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
+                                       close( wfd );
+                                       close( rfd );
+                                       return -1;
+                               }
+                       }
+
+                       remain -= len;          // recompute what we need to write, and try again
+                       start += len;
+               }
+       }
+
+       state = close( wfd );
+       close( rfd );
+
+       if( state < 0 ) {
+               logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
+       } else {
+               if( mode != 0 ) {
+                       chmod( tfname, mode );
+               }
+               //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
+               if( (state = rename( tfname, new )) < 0 ) {
+                       logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
+               } else {
+                       if( unlink( old ) < 0 ) {
+                               logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
+                       }
+               }
+       }
+
+       return state < 0 ? -1 : 0;
+}
+
+/*
+       Attempt to rename (move) the old file to the new file. If that fails with
+       the error EXDEV (not on same device), then we will copy the file. All
+       other errors returned by the rename() command are considered fatal and
+       copy is not attempted.  Returns 0 on success, -1 with errno set on 
+       failure. See man page for rename for possible errno values and meanings.
+*/
+static int mvocp( char* old, char* new ) {
+
+       chmod( old, 0644 );                                                             // give it proper mode before moving it
+
+       if( rename( old, new ) < 0 ) {
+               if( errno != EXDEV ) {
+                       logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
+                       return -1;
+               }
+
+               return copy_unlink( old, new, 0644 );
+       }
+       
+       return 0;
+}
+
+/*
+       Opens a new file for writing. Returns the fd. The context fname field will
+       in the context point to the basename (no suffix) on return.
+*/
+static int rdc_open( void* vctx ) {
+       char    fname[2048];
+       char    basename[2048];
+       int             fd;
+       time_t  ts;                             // time stamp, rounded to previous frequency
+       rdc_ctx_t* ctx;
+
+       ctx = (rdc_ctx_t *) vctx;
+       
+       
+       if( ctx == NULL ) {
+               return -1;
+       }
+
+       ts = time( NULL );
+       ts = ts - (ts % ctx->frequency);                        // round to previous frequency
+       ctx->next_roll = ts + ctx->frequency;           // set next time to roll the file
+
+       snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts );            // basename needed to build final file name at close
+       snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
+       fd = open( fname, O_WRONLY | O_CREAT, 0200 );           // open in w-- mode so that it should not be readable
+       if( fd < 0 ) {
+               logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
+               return fd;                                                                              // leave errno set by open attempt
+       }
+
+       lseek( fd, 0, SEEK_END );                                                       // if file existed, continue appending to it
+       logit( LOG_INFO, "(rdf) now writing to: %s", fname );
+       ctx->openname = strdup( fname );
+       ctx->basename = strdup( basename );
+       ctx->fd = fd;
+
+       return fd;
+}
+
+// ------------------ public things -------------------------------------------------------
+
+/*
+       A generic log function such that if pressed to write logs using the log
+       library which yields completely unreadable json messages, we can comply
+       yet still have readable messages when testing. If the message type LOG_STAT
+       then the message is written to stdout rather than stderr.
+*/
+extern void logit( int priority, char *fmt, ... ) {
+    va_list argp;
+    char ubuf[4*1024+1];                               // build user message here
+    char* pstr = "UNK";                                        // priority string
+       FILE*   dest = stderr;                          // where to write the message
+
+       switch( priority ) {
+               case LOG_STAT:
+                       pstr = "STAT";
+                       dest = stdout;
+                       break;
+
+               case LOG_CRIT:
+                       pstr = "CRI";
+                       break;
+
+               case LOG_ERR:
+                       pstr = "ERR";
+                       break;
+
+               case LOG_WARN:
+                       pstr = "WARN";
+                       break;
+
+               default:
+                       pstr = "INFO";
+                       break;
+       }
+
+    va_start( argp, fmt );                                                             // point at first variable arguement
+    vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp );   // build the user portion of the message
+    va_end( argp );
+
+    fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
+}
+
+
+/*
+       Initialise the raw data collection facility.  The two directories, staging and final, may be the 
+       same, and if different they _should_ reside on the same filesystem such that a move
+       (rename) operation is only the change in the directory and does not involve the copy
+       of bytes.
+
+       Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
+       we assume that once the file is moved from staging to final, we create an empty
+       "done" file using the dsuffix.
+
+       A pointer to the context is returned; nil on error with errno set to some useful
+       (we hope) value.
+*/
+extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
+       rdc_ctx_t*      ctx;
+       char*           ep;                     // pointer at environment var value
+
+       ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
+       if( ctx == NULL ) {
+               errno = ENOMEM;
+               return NULL;
+       }
+       memset( ctx, 0, sizeof( *ctx ) );
+
+       if( sdir == NULL ) {
+               if( fdir == NULL ) {
+                       errno = EINVAL;         // must have at least one of these
+                       free( ctx );
+                       return NULL;
+               }
+
+               ctx->sdir = strdup( fdir );
+               ctx->fdir = strdup( fdir );
+       } else {
+               ctx->sdir = strdup( sdir );
+               if( fdir == NULL ) {
+                       ctx->fdir = strdup( sdir );
+               } else {
+                       ctx->fdir = strdup( fdir );
+               }
+               
+       }
+
+       if( suffix ) {
+               ctx->suffix = strdup( suffix );
+       }
+       if( dsuffix ) {
+               ctx->dsuffix = strdup( dsuffix );
+       }
+
+       if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
+               ctx->source = strdup( ep );
+       } else {
+               ctx->source = strdup( "" );
+       }
+
+       ctx->frequency = 300;                   // default to 5 min roll
+       ctx->fd = -1;
+
+       return (void *) ctx;
+}
+
+/*
+       Allow the file rotation frequency to be set to something other
+       than the default.  A minimum of 10s is enforced, but there is 
+       no maximum.
+*/
+extern void rdc_set_freq( void* vctx, int freq ) {
+       rdc_ctx_t* ctx;
+
+       ctx = (rdc_ctx_t *) vctx;
+       
+       if( ctx != NULL && freq > 10 ) {
+               ctx->frequency = freq;
+       }
+
+       logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
+}
+
+/*
+       Close the currently open file and move it to it's final resting place in fdir.
+       If the done suffix in the context is not nil, then we touch a done file which
+       has the same basename with the done suffix; this is also placed into the final
+       dir.
+*/
+extern void rdc_close( void* vctx ) {
+       char    target[2048];
+       char*   t_suffix;
+       int             fd;
+       rdc_ctx_t* ctx;
+
+       ctx = (rdc_ctx_t *) vctx;
+       if( ctx == NULL || ctx->fd < 0 ) {
+               return;
+       }
+
+       close( ctx->fd );               // future -- check for error
+       ctx->fd = -1;
+
+       t_suffix =  ctx->suffix != NULL  ? ctx->suffix : "";
+       snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix );            // final target name
+       if( mvocp( ctx->openname, target ) < 0 ) {
+               logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
+       } else {
+               logit( LOG_INFO, "capture file closed and moved to: %s", target );
+               if( ctx->dsuffix != NULL ) {                            // must also create a done file
+                       snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
+                       if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
+                               close( fd );
+                               logit( LOG_INFO, "created done file: %s", target );
+                       } else {
+                               logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
+                       }
+               }
+       }
+
+       free( ctx->basename );
+       free( ctx->openname );
+       ctx->basename = NULL;
+       ctx->openname = NULL;
+}
+
+/*
+       Writes the payload using the previously initialised capture buffer.
+       If it's time to roll the file, or the file isn't opened, the needed housekeeping
+       is done first.
+*/
+extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
+       cap_buf_t* cb;
+       char    header[100];                                    // our header
+       rdc_ctx_t* ctx;
+
+       cb = (cap_buf_t *) vcb;
+       ctx = (rdc_ctx_t *) vctx;
+       if( ctx == NULL || cb == NULL) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       if( time( NULL ) >= ctx->next_roll ) {
+               rdc_close( ctx );                                       // close up the current one
+       }
+
+       if( ctx->fd < 0 ) {
+               rdc_open( ctx );                                        // need a file, get it open
+       }       
+
+       snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
+       write( ctx->fd, header, 20 );
+       write( ctx->fd, cb->uheader, cb->uhlen );
+       write( ctx->fd, payload, len );
+
+       return 0;               // future -- check and return error
+}
+
+/*
+       Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
+       We save the message type, and will use that and the user header length and payload
+       length on write to create the complete RDC header.
+*/
+extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) { 
+       cap_buf_t* cb;
+
+       cb = (cap_buf_t *) vcb;
+       if( cb == NULL ) {
+               cb = (cap_buf_t *) malloc( sizeof( *cb ) );
+               if( cb == NULL ) {
+                       errno = ENOMEM;
+                       return NULL;
+               }
+       }       
+
+       cb->mtype = mtype;
+       if(  uhlen > sizeof( cb->uheader ) ) {
+               uhlen = sizeof( uheader );
+       }
+       memcpy( cb->uheader, uheader, uhlen );
+       cb->uhlen = uhlen;
+
+       return (void *) cb;
+}
+
+#ifdef SELF_TEST
+/*
+       Run some quick checks which require the directories in /tmp to exist, and some 
+       manual verification on the part of the tester.
+*/
+int main( ) {
+       void*   ctx;                    // context
+       void*   cb = NULL;              // capture buffere
+       char*   uheader;
+       char*   payload;
+       int     i;
+
+       ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL );                     // does not create done files
+       //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" );        // will create a "done" file
+       if( ctx == NULL ) {
+               logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer\n" );
+               exit( 1 );
+       }
+
+       rdc_set_freq( ctx, 60 );
+
+       logit( LOG_INFO, "<TEST> writing three messages" );
+       for( i = 0; i < 3; i++ ) {
+               uheader = "@MRC==len==*---timestamp---*";
+               cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
+               payload = "this is a test and just a test if it were not a test you'd be notified";
+               rdc_write( ctx, cb, payload, strlen( payload ) +1 );
+
+               sleep( 1 );
+       }
+
+       logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
+       sleep( 80 );
+       logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
+       for( i = 0; i < 13; i++ ) {
+               uheader = "@MRC==len==*---timestamp---*";
+               cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
+               payload = "this is a test and just a test if it were not a test you'd be notified";
+               rdc_write( ctx, cb, payload, strlen( payload ) +1 );
+
+               sleep( 1 );
+       }
+
+       rdc_close( ctx );                       // force move of the current file before exit
+}
+#endif
index e293d49..7429336 100755 (executable)
 # Author:      E. Scott Daniels
 # ----------------------------------------------------------------------
 
+# set the various sleep values based on long test or short test
+function set_wait_values {
+       if (( long_test ))
+       then
+               export MCL_RDC_FREQ=13          # file cycle after 13s
+               sender_wait=100
+               listener_wait=105
+               reader_wait=102
+               main_wait=120
+       else
+               sender_wait=10
+               listener_wait=15
+               reader_wait=12
+               main_wait=20
+       fi
+}
+
 # run sender at a 2 msg/sec rate (500000 musec delay)
 # sender sends msg types 0-6 and the route table in /tmp
 # will direct them to the listener. We also need to switch
@@ -42,7 +59,7 @@ function run_sender {
        echo "starting sender"
        RMR_SEED_RT=/tmp/local.rt RMR_RTG_SVC=9989 /playpen/bin/sender 43086 10000 >/tmp/sender.log 2>&1 &
        spid=$!
-       sleep 10
+       sleep $sender_wait
 
        echo "stopping sender"
        kill -15 $spid
@@ -53,7 +70,7 @@ function run_listener {
        /playpen/bin/mc_listener $ext_hdr -r 1 -d $fifo_dir >/tmp/listen.log 2>&1 &
        lpid=$!
 
-       sleep 15
+       sleep $listener_wait
        echo "stopping listener"
        kill -15 $lpid
 }
@@ -65,7 +82,7 @@ function run_pr {
        #/playpen/bin/pipe_reader -m $1 -d $fifo_dir & # >/tmp/pr.$1.log 2>&1 
        typeset prpid=$!
        
-       sleep 12
+       sleep $reader_wait
        echo "stopping pipe reader $1"
        kill -1 $prpid
 }
@@ -88,6 +105,42 @@ endKat
 # ---- run everything ---------------------------------------------------
 
 ext_hdr=""                                     # run with extended header enabled (-e turns extended off)
+long_test=0
+raw_capture=1
+while [[ $1 == -* ]]
+do
+       case $1 in 
+               -l)     long_test=1;;
+               -n)     raw_capture=0;;
+               *)      echo "$1 is not a recognised option"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+set_wait_values
+
+if (( ! raw_capture ))         # -n set, turn off capture
+then
+       export MCL_RDC_ENABLE=0
+fi
+
+if [[ -d /data/final ]]                        # assume if we find data that final directory goes here
+then
+       echo "### found /data/final using that as final directory"
+       export MCL_RDC_FINAL=/data/final
+fi
+
+if [[ -d /data/stage ]]
+then
+       echo "### found /data/staging using that as stage directory"
+       export MCL_RDC_STAGE=/data/stage
+fi
+final_dir=${MCL_RDC_FINAL:-/tmp/rdc/final}
+stage_dir=${MCL_RDC_STAGE:-/tmp/rdc/stage}
+
 fifo_dir=/tmp/fifos
 mkdir -p $fifo_dir                     # redirect fifos so we don't depend on mount
 
@@ -102,7 +155,7 @@ done
 sleep 1
 run_sender &
 
-sleep 20                       # long enough for all functions to finish w/o having to risk a wait hanging
+sleep $main_wait                       # long enough for all functions to finish w/o having to risk a wait hanging
 echo "all functions stopped; looking at logs"
 
 # ---------- validation -------------------------------------------------
@@ -147,6 +200,40 @@ else
        echo "[OK]    Found expected fifos"
 fi
 
+if (( raw_capture ))           # not an error if not capturing
+then
+       if [[ -d $stage_dir ]]
+       then
+               echo "[OK]    Found staging direcory ($stage_dir)"
+               ls -al $stage_dir
+       else
+               (( errors++ ))
+               echo "[FAIL]  No staging directory found ($stage_dir)"
+       fi
+
+
+       if [[ -d $final_dir ]]
+       then
+               echo "[OK]    Found final direcory ($final_dir)"
+               ls -al $final_dir
+       
+               if (( long_test ))              # look for files in final dir to ensure roll
+               then
+                       found=$( ls $final_dir/MC* | wc -l )
+                       if (( found > 0 ))
+                       then
+                               echo "[OK]   Found $found files in final directory ($final_dir)"
+                       else
+                               echo "[FAIL] Did not find any files in the final directory ($final_dir)"
+                               (( errors++ ))
+                       fi
+               fi
+       else
+               (( errors++ ))
+               echo "[FAIL]  No final directory found"
+       fi
+fi
+
 if (( errors ))
 then
        echo "[FAIL] $errors errors noticed"