--- /dev/null
+// vim: ts=4 sw=4 noet:
+/*
+--------------------------------------------------------------------------------
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+--------------------------------------------------------------------------------
+*/
+
+/*
+ Mnemonic: rdc.c.
+ Abstract: This module contains library functions which implement the
+ raw data collection (rdc) portion of the listener. The fanout
+ function in the library will call these functions to capture
+ raw messages, with a header, for later "replay" or other
+ analysis. Messages are captured as they are written to the
+ FIFO, with a small header added:
+ @RDC<mtype><len>
+
+ Where <mtype> is the message type of the message received and
+ <len> is the length of the data that was written to the FIFO.
+
+
+ Date: 06 Oct 2019
+ Author: E. Scott Daniels
+*/
+
+#include <stdarg.h>
+#include <errno.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+#include <sys/stat.h>
+
+#include "mcl.h"
+
+/*
+ A capture buffer. The listener writes FIFO output in two stages, thus
+ we provide the ability to initialise a capture with the msg type and
+ the MRL header, then to write the payload using this saved data. The
+ idea is to not require the caller to save the header.
+*/
+typedef struct {
+ char uheader[100]; // user header (max 100 bytes)
+ int uhlen; // length of user header
+ int mtype; // message type
+} cap_buf_t;
+
+typedef struct {
+ int flags; // DFFL_* constatnts
+ int frequency; // the frequency at which files are rolled
+ int fd; // current open write file
+ char* sdir; // staging directory
+ char* fdir; // final directory
+ char* suffix; // suffix for final file (must include . if needed)
+ char* dsuffix; // suffix for done file
+ char* basename; // base name of the file being written to
+ char* openname; // full filename that is open for writing
+ char* source; // added to output file names to differentiate the source
+ time_t next_roll; // time we should roll the file
+} rdc_ctx_t;
+
+#define RDC_DELIM "@RDC" // delimeter used in our file
+
+// -------------------------------------------------------------------------------------------
+
+/*
+ Copy and unlink old file is successful. During writing the file mode will
+ be write only for the owner (0200). If the mode passed in is not 0, then
+ just prior to renaming the file to 'new', the mode will be changed. If
+ mode is 0, then we assume the caller will change the file mode when
+ appropriate.
+
+ There seems to be an issue with some collectors and thus it is required
+ to initially name the file with a leading dot (.) until the file is closed
+ and ready to be read by external processes (marking it write only seems
+ not to discourage them from trying!).
+*/
+static int copy_unlink( char* old, char* new, int mode ) {
+ char buf[8192]; // read buffer
+ char* tfname; // temp file name while we have it open
+ char* wbuf; // work buffer for disecting the new filename
+ char* tok; // token pointer into a buffer
+ int len;
+ int rfd; // read/write file descriptors
+ int wfd;
+ int start;
+ int state;
+ int remain; // number of bytes remaining to write
+
+
+ errno = 0;
+ if( (rfd = open( old, O_RDONLY )) < 0 ) {
+ logit( LOG_ERR, "copy: open src for copy failed: %s: %s\n", old, strerror( errno ) );
+ return -1;
+ }
+
+ len = sizeof( char ) * (strlen( new ) + 2 ); // space needed for temp file name with added .
+ tfname = (char *) malloc( len );
+ wbuf = strdup( new ); // we need to trash the string, so copy
+ tok = strrchr( wbuf, '/' ); // find end of path
+ if( tok ) {
+ *tok = 0;
+ tok++;
+ snprintf( tfname, len, "%s/.%s", wbuf, tok ); // insert . to "hide" from collector
+ } else {
+ snprintf( tfname, len, ".%s", wbuf ); // no path, just add leading .
+ }
+ //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
+
+ if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
+ logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s\n", tfname, strerror( errno ) );
+ return -1;
+ }
+
+ while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
+ remain = len;
+ start = 0;
+ while( remain > 0 ) {
+ errno = 0;
+ if( (len = write( wfd, &buf[start], len )) != remain ) { // short write
+ if( errno != EINTR && errno != EAGAIN ) {
+ logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
+ close( wfd );
+ close( rfd );
+ return -1;
+ }
+ }
+
+ remain -= len; // recompute what we need to write, and try again
+ start += len;
+ }
+ }
+
+ state = close( wfd );
+ close( rfd );
+
+ if( state < 0 ) {
+ logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
+ } else {
+ if( mode != 0 ) {
+ chmod( tfname, mode );
+ }
+ //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
+ if( (state = rename( tfname, new )) < 0 ) {
+ logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
+ } else {
+ if( unlink( old ) < 0 ) {
+ logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
+ }
+ }
+ }
+
+ return state < 0 ? -1 : 0;
+}
+
+/*
+ Attempt to rename (move) the old file to the new file. If that fails with
+ the error EXDEV (not on same device), then we will copy the file. All
+ other errors returned by the rename() command are considered fatal and
+ copy is not attempted. Returns 0 on success, -1 with errno set on
+ failure. See man page for rename for possible errno values and meanings.
+*/
+static int mvocp( char* old, char* new ) {
+
+ chmod( old, 0644 ); // give it proper mode before moving it
+
+ if( rename( old, new ) < 0 ) {
+ if( errno != EXDEV ) {
+ logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
+ return -1;
+ }
+
+ return copy_unlink( old, new, 0644 );
+ }
+
+ return 0;
+}
+
+/*
+ Opens a new file for writing. Returns the fd. The context fname field will
+ in the context point to the basename (no suffix) on return.
+*/
+static int rdc_open( void* vctx ) {
+ char fname[2048];
+ char basename[2048];
+ int fd;
+ time_t ts; // time stamp, rounded to previous frequency
+ rdc_ctx_t* ctx;
+
+ ctx = (rdc_ctx_t *) vctx;
+
+
+ if( ctx == NULL ) {
+ return -1;
+ }
+
+ ts = time( NULL );
+ ts = ts - (ts % ctx->frequency); // round to previous frequency
+ ctx->next_roll = ts + ctx->frequency; // set next time to roll the file
+
+ snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts ); // basename needed to build final file name at close
+ snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
+ fd = open( fname, O_WRONLY | O_CREAT, 0200 ); // open in w-- mode so that it should not be readable
+ if( fd < 0 ) {
+ logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
+ return fd; // leave errno set by open attempt
+ }
+
+ lseek( fd, 0, SEEK_END ); // if file existed, continue appending to it
+ logit( LOG_INFO, "(rdf) now writing to: %s", fname );
+ ctx->openname = strdup( fname );
+ ctx->basename = strdup( basename );
+ ctx->fd = fd;
+
+ return fd;
+}
+
+// ------------------ public things -------------------------------------------------------
+
+/*
+ A generic log function such that if pressed to write logs using the log
+ library which yields completely unreadable json messages, we can comply
+ yet still have readable messages when testing. If the message type LOG_STAT
+ then the message is written to stdout rather than stderr.
+*/
+extern void logit( int priority, char *fmt, ... ) {
+ va_list argp;
+ char ubuf[4*1024+1]; // build user message here
+ char* pstr = "UNK"; // priority string
+ FILE* dest = stderr; // where to write the message
+
+ switch( priority ) {
+ case LOG_STAT:
+ pstr = "STAT";
+ dest = stdout;
+ break;
+
+ case LOG_CRIT:
+ pstr = "CRI";
+ break;
+
+ case LOG_ERR:
+ pstr = "ERR";
+ break;
+
+ case LOG_WARN:
+ pstr = "WARN";
+ break;
+
+ default:
+ pstr = "INFO";
+ break;
+ }
+
+ va_start( argp, fmt ); // point at first variable arguement
+ vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp ); // build the user portion of the message
+ va_end( argp );
+
+ fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
+}
+
+
+/*
+ Initialise the raw data collection facility. The two directories, staging and final, may be the
+ same, and if different they _should_ reside on the same filesystem such that a move
+ (rename) operation is only the change in the directory and does not involve the copy
+ of bytes.
+
+ Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
+ we assume that once the file is moved from staging to final, we create an empty
+ "done" file using the dsuffix.
+
+ A pointer to the context is returned; nil on error with errno set to some useful
+ (we hope) value.
+*/
+extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
+ rdc_ctx_t* ctx;
+ char* ep; // pointer at environment var value
+
+ ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
+ if( ctx == NULL ) {
+ errno = ENOMEM;
+ return NULL;
+ }
+ memset( ctx, 0, sizeof( *ctx ) );
+
+ if( sdir == NULL ) {
+ if( fdir == NULL ) {
+ errno = EINVAL; // must have at least one of these
+ free( ctx );
+ return NULL;
+ }
+
+ ctx->sdir = strdup( fdir );
+ ctx->fdir = strdup( fdir );
+ } else {
+ ctx->sdir = strdup( sdir );
+ if( fdir == NULL ) {
+ ctx->fdir = strdup( sdir );
+ } else {
+ ctx->fdir = strdup( fdir );
+ }
+
+ }
+
+ if( suffix ) {
+ ctx->suffix = strdup( suffix );
+ }
+ if( dsuffix ) {
+ ctx->dsuffix = strdup( dsuffix );
+ }
+
+ if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
+ ctx->source = strdup( ep );
+ } else {
+ ctx->source = strdup( "" );
+ }
+
+ ctx->frequency = 300; // default to 5 min roll
+ ctx->fd = -1;
+
+ return (void *) ctx;
+}
+
+/*
+ Allow the file rotation frequency to be set to something other
+ than the default. A minimum of 10s is enforced, but there is
+ no maximum.
+*/
+extern void rdc_set_freq( void* vctx, int freq ) {
+ rdc_ctx_t* ctx;
+
+ ctx = (rdc_ctx_t *) vctx;
+
+ if( ctx != NULL && freq > 10 ) {
+ ctx->frequency = freq;
+ }
+
+ logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
+}
+
+/*
+ Close the currently open file and move it to it's final resting place in fdir.
+ If the done suffix in the context is not nil, then we touch a done file which
+ has the same basename with the done suffix; this is also placed into the final
+ dir.
+*/
+extern void rdc_close( void* vctx ) {
+ char target[2048];
+ char* t_suffix;
+ int fd;
+ rdc_ctx_t* ctx;
+
+ ctx = (rdc_ctx_t *) vctx;
+ if( ctx == NULL || ctx->fd < 0 ) {
+ return;
+ }
+
+ close( ctx->fd ); // future -- check for error
+ ctx->fd = -1;
+
+ t_suffix = ctx->suffix != NULL ? ctx->suffix : "";
+ snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix ); // final target name
+ if( mvocp( ctx->openname, target ) < 0 ) {
+ logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
+ } else {
+ logit( LOG_INFO, "capture file closed and moved to: %s", target );
+ if( ctx->dsuffix != NULL ) { // must also create a done file
+ snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
+ if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
+ close( fd );
+ logit( LOG_INFO, "created done file: %s", target );
+ } else {
+ logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
+ }
+ }
+ }
+
+ free( ctx->basename );
+ free( ctx->openname );
+ ctx->basename = NULL;
+ ctx->openname = NULL;
+}
+
+/*
+ Writes the payload using the previously initialised capture buffer.
+ If it's time to roll the file, or the file isn't opened, the needed housekeeping
+ is done first.
+*/
+extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
+ cap_buf_t* cb;
+ char header[100]; // our header
+ rdc_ctx_t* ctx;
+
+ cb = (cap_buf_t *) vcb;
+ ctx = (rdc_ctx_t *) vctx;
+ if( ctx == NULL || cb == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if( time( NULL ) >= ctx->next_roll ) {
+ rdc_close( ctx ); // close up the current one
+ }
+
+ if( ctx->fd < 0 ) {
+ rdc_open( ctx ); // need a file, get it open
+ }
+
+ snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
+ write( ctx->fd, header, 20 );
+ write( ctx->fd, cb->uheader, cb->uhlen );
+ write( ctx->fd, payload, len );
+
+ return 0; // future -- check and return error
+}
+
+/*
+ Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
+ We save the message type, and will use that and the user header length and payload
+ length on write to create the complete RDC header.
+*/
+extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
+ cap_buf_t* cb;
+
+ cb = (cap_buf_t *) vcb;
+ if( cb == NULL ) {
+ cb = (cap_buf_t *) malloc( sizeof( *cb ) );
+ if( cb == NULL ) {
+ errno = ENOMEM;
+ return NULL;
+ }
+ }
+
+ cb->mtype = mtype;
+ if( uhlen > sizeof( cb->uheader ) ) {
+ uhlen = sizeof( uheader );
+ }
+ memcpy( cb->uheader, uheader, uhlen );
+ cb->uhlen = uhlen;
+
+ return (void *) cb;
+}
+
+#ifdef SELF_TEST
+/*
+ Run some quick checks which require the directories in /tmp to exist, and some
+ manual verification on the part of the tester.
+*/
+int main( ) {
+ void* ctx; // context
+ void* cb = NULL; // capture buffere
+ char* uheader;
+ char* payload;
+ int i;
+
+ ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL ); // does not create done files
+ //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" ); // will create a "done" file
+ if( ctx == NULL ) {
+ logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer\n" );
+ exit( 1 );
+ }
+
+ rdc_set_freq( ctx, 60 );
+
+ logit( LOG_INFO, "<TEST> writing three messages" );
+ for( i = 0; i < 3; i++ ) {
+ uheader = "@MRC==len==*---timestamp---*";
+ cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
+ payload = "this is a test and just a test if it were not a test you'd be notified";
+ rdc_write( ctx, cb, payload, strlen( payload ) +1 );
+
+ sleep( 1 );
+ }
+
+ logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
+ sleep( 80 );
+ logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
+ for( i = 0; i < 13; i++ ) {
+ uheader = "@MRC==len==*---timestamp---*";
+ cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
+ payload = "this is a test and just a test if it were not a test you'd be notified";
+ rdc_write( ctx, cb, payload, strlen( payload ) +1 );
+
+ sleep( 1 );
+ }
+
+ rdc_close( ctx ); // force move of the current file before exit
+}
+#endif