1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: This module contains library functions which implement the
23 raw data collection (rdc) portion of the listener. The fanout
24 function in the library will call these functions to capture
25 raw messages, with a header, for later "replay" or other
26 analysis. Messages are captured as they are written to the
27 FIFO, with a small header added:
30 Where <mtype> is the message type of the message received and
31 <len> is the length of the data that was written to the FIFO.
35 Author: E. Scott Daniels
51 A capture buffer. The listener writes FIFO output in two stages, thus
52 we provide the ability to initialise a capture with the msg type and
53 the MRL header, then to write the payload using this saved data. The
54 idea is to not require the caller to save the header.
57 char uheader[100]; // user header (max 100 bytes)
58 int uhlen; // length of user header
59 int mtype; // message type
63 int flags; // DFFL_* constatnts
64 int frequency; // the frequency at which files are rolled
65 int fd; // current open write file
66 char* sdir; // staging directory
67 char* fdir; // final directory
68 char* suffix; // suffix for final file (must include . if needed)
69 char* dsuffix; // suffix for done file
70 char* basename; // base name of the file being written to
71 char* openname; // full filename that is open for writing
72 char* source; // added to output file names to differentiate the source
73 time_t next_roll; // time we should roll the file
76 #define RDC_DELIM "@RDC" // delimeter used in our file
78 // -------------------------------------------------------------------------------------------
81 Copy and unlink old file is successful. During writing the file mode will
82 be write only for the owner (0200). If the mode passed in is not 0, then
83 just prior to renaming the file to 'new', the mode will be changed. If
84 mode is 0, then we assume the caller will change the file mode when
87 There seems to be an issue with some collectors and thus it is required
88 to initially name the file with a leading dot (.) until the file is closed
89 and ready to be read by external processes (marking it write-only seems
90 not to discourage them from trying!).
92 static int copy_unlink( char* old, char* new, int mode ) {
93 char buf[8192]; // read buffer
94 char* tfname; // temp file name while we have it open
95 char* wbuf; // work buffer for disecting the new filename
96 char* tok; // token pointer into a buffer
98 int rfd; // read/write file descriptors
102 int remain; // number of bytes remaining to write
106 if( (rfd = open( old, O_RDONLY )) < 0 ) {
107 logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
111 len = sizeof( char ) * (strlen( new ) + 2 ); // space needed for temp file name with added .
112 tfname = (char *) malloc( len );
113 wbuf = strdup( new ); // we need to trash the string, so copy
114 tok = strrchr( wbuf, '/' ); // find end of path
118 snprintf( tfname, len, "%s/.%s", wbuf, tok ); // insert . to "hide" from collector
120 snprintf( tfname, len, ".%s", wbuf ); // no path, just add leading .
123 //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
125 if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
126 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s", tfname, strerror( errno ) );
130 while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
133 while( remain > 0 ) {
135 if( (len = write( wfd, &buf[start], len )) != remain ) { // short write
136 if( errno != EINTR && errno != EAGAIN ) {
137 logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
144 remain -= len; // recompute what we need to write, and try again
149 state = close( wfd );
153 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
156 chmod( tfname, mode );
158 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
159 if( (state = rename( tfname, new )) < 0 ) {
160 logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
162 if( unlink( old ) < 0 ) {
163 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
169 return state < 0 ? -1 : 0;
173 Attempt to rename (move) the old file to the new file. If that fails with
174 the error EXDEV (not on same device), then we will copy the file. All
175 other errors returned by the rename() command are considered fatal and
176 copy is not attempted. Returns 0 on success, -1 with errno set on
177 failure. See man page for rename for possible errno values and meanings.
179 static int mvocp( char* old, char* new ) {
181 chmod( old, 0644 ); // give it proper mode before moving it
183 if( rename( old, new ) < 0 ) {
184 if( errno != EXDEV ) {
185 logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
189 return copy_unlink( old, new, 0644 );
196 Opens a new file for writing. Returns the fd. The context fname field will
197 in the context point to the basename (no suffix) on return.
199 static int rdc_open( void* vctx ) {
203 time_t ts; // time stamp, rounded to previous frequency
206 ctx = (rdc_ctx_t *) vctx;
214 ts = ts - (ts % ctx->frequency); // round to previous frequency
215 ctx->next_roll = ts + ctx->frequency; // set next time to roll the file
217 snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts ); // basename needed to build final file name at close
218 snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
219 fd = open( fname, O_WRONLY | O_CREAT, 0200 ); // open in w-- mode so that it should not be readable
221 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
222 return fd; // leave errno set by open attempt
225 lseek( fd, 0, SEEK_END ); // if file existed, continue appending to it
226 logit( LOG_INFO, "(rdf) now writing to: %s", fname );
227 ctx->openname = strdup( fname );
228 ctx->basename = strdup( basename );
234 // ------------------ public things -------------------------------------------------------
237 A generic log function such that if pressed to write logs using the log
238 library which yields completely unreadable json messages, we can comply
239 yet still have readable messages when testing. If the message type LOG_STAT
240 then the message is written to stdout rather than stderr.
242 extern void logit( int priority, char *fmt, ... ) {
244 char ubuf[4*1024+1]; // build user message here
245 char* pstr = "UNK"; // priority string
246 FILE* dest = stderr; // where to write the message
271 va_start( argp, fmt ); // point at first variable arguement
272 vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp ); // build the user portion of the message
275 fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
280 Initialise the raw data collection facility. The two directories, staging and final, may be the
281 same, and if different they _should_ reside on the same filesystem such that a move
282 (rename) operation is only the change in the directory and does not involve the copy
285 Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
286 we assume that once the file is moved from staging to final, we create an empty
287 "done" file using the dsuffix.
289 A pointer to the context is returned; nil on error with errno set to some useful
292 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
294 char* ep; // pointer at environment var value
296 ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
301 memset( ctx, 0, sizeof( *ctx ) );
305 errno = EINVAL; // must have at least one of these
310 ctx->sdir = strdup( fdir );
311 ctx->fdir = strdup( fdir );
313 ctx->sdir = strdup( sdir );
315 ctx->fdir = strdup( sdir );
317 ctx->fdir = strdup( fdir );
323 ctx->suffix = strdup( suffix );
326 ctx->dsuffix = strdup( dsuffix );
329 if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
330 ctx->source = strdup( ep );
332 ctx->source = strdup( "" );
335 ctx->frequency = 300; // default to 5 min roll
342 Allow the file rotation frequency to be set to something other
343 than the default. A minimum of 10s is enforced, but there is
346 extern void rdc_set_freq( void* vctx, int freq ) {
349 ctx = (rdc_ctx_t *) vctx;
351 if( ctx != NULL && freq >= 10 ) {
352 ctx->frequency = freq;
353 logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
355 logit( LOG_ERR, "(rdc) file roll frequency was not set; ctx was nill or frequency was less than 10s" );
360 Close the currently open file and move it to it's final resting place in fdir.
361 If the done suffix in the context is not nil, then we touch a done file which
362 has the same basename with the done suffix; this is also placed into the final
365 extern void rdc_close( void* vctx ) {
371 ctx = (rdc_ctx_t *) vctx;
372 if( ctx == NULL || ctx->fd < 0 ) {
376 close( ctx->fd ); // future -- check for error
379 t_suffix = ctx->suffix != NULL ? ctx->suffix : "";
380 snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix ); // final target name
381 if( mvocp( ctx->openname, target ) < 0 ) {
382 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
384 logit( LOG_INFO, "capture file closed and moved to: %s", target );
385 if( ctx->dsuffix != NULL ) { // must also create a done file
386 snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
387 if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
389 logit( LOG_INFO, "created done file: %s", target );
391 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
396 free( ctx->basename );
397 free( ctx->openname );
398 ctx->basename = NULL;
399 ctx->openname = NULL;
403 Writes the payload using the previously initialised capture buffer.
404 If it's time to roll the file, or the file isn't opened, the needed housekeeping
407 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
409 char header[100]; // our header
412 cb = (cap_buf_t *) vcb;
413 ctx = (rdc_ctx_t *) vctx;
414 if( ctx == NULL || cb == NULL) {
419 if( time( NULL ) >= ctx->next_roll ) {
420 rdc_close( ctx ); // close up the current one
424 rdc_open( ctx ); // need a file, get it open
427 snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
428 write( ctx->fd, header, 20 );
429 write( ctx->fd, cb->uheader, cb->uhlen );
430 write( ctx->fd, payload, len );
432 return 0; // future -- check and return error
436 Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
437 We save the message type, and will use that and the user header length and payload
438 length on write to create the complete RDC header.
440 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
443 cb = (cap_buf_t *) vcb;
445 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
453 if( uhlen > sizeof( cb->uheader ) ) {
454 uhlen = sizeof( uheader );
456 memcpy( cb->uheader, uheader, uhlen );
464 Run some quick checks which require the directories in /tmp to exist, and some
465 manual verification on the part of the tester.
468 void* ctx; // context
469 void* cb = NULL; // capture buffere
474 ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL ); // does not create done files
475 //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" ); // will create a "done" file
477 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer" );
481 rdc_set_freq( ctx, 60 );
483 logit( LOG_INFO, "<TEST> writing three messages" );
484 for( i = 0; i < 3; i++ ) {
485 uheader = "@MRC==len==*---timestamp---*";
486 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
487 payload = "this is a test and just a test if it were not a test you'd be notified";
488 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
493 logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
495 logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
496 for( i = 0; i < 13; i++ ) {
497 uheader = "@MRC==len==*---timestamp---*";
498 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
499 payload = "this is a test and just a test if it were not a test you'd be notified";
500 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
505 rdc_close( ctx ); // force move of the current file before exit