1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: This module contains library functions which implement the
23 raw data collection (rdc) portion of the listener. The fanout
24 function in the library will call these functions to capture
25 raw messages, with a header, for later "replay" or other
26 analysis. Messages are captured as they are written to the
27 FIFO, with a small header added:
30 Where <mtype> is the message type of the message received and
31 <len> is the length of the data that was written to the FIFO.
35 Author: E. Scott Daniels
51 A capture buffer. The listener writes FIFO output in two stages, thus
52 we provide the ability to initialise a capture with the msg type and
53 the MRL header, then to write the payload using this saved data. The
54 idea is to not require the caller to save the header.
57 char uheader[100]; // user header (max 100 bytes)
58 int uhlen; // length of user header
59 int mtype; // message type
63 int flags; // DFFL_* constatnts
64 int frequency; // the frequency at which files are rolled
65 int fd; // current open write file
66 char* sdir; // staging directory
67 char* fdir; // final directory
68 char* suffix; // suffix for final file (must include . if needed)
69 char* dsuffix; // suffix for done file
70 char* basename; // base name of the file being written to
71 char* openname; // full filename that is open for writing
72 char* source; // added to output file names to differentiate the source
73 time_t next_roll; // time we should roll the file
76 #define RDC_DELIM "@RDC" // delimeter used in our file
78 // -------------------------------------------------------------------------------------------
81 Copy and unlink old file is successful. During writing the file mode will
82 be write only for the owner (0200). If the mode passed in is not 0, then
83 just prior to renaming the file to 'new', the mode will be changed. If
84 mode is 0, then we assume the caller will change the file mode when
87 There seems to be an issue with some collectors and thus it is required
88 to initially name the file with a leading dot (.) until the file is closed
89 and ready to be read by external processes (marking it write-only seems
90 not to discourage them from trying!).
92 static int copy_unlink( char* old, char* new, int mode ) {
93 char buf[8192]; // read buffer
94 char* tfname = NULL; // temp file name while we have it open
95 char* wbuf; // work buffer for disecting the new filename
96 char* tok; // token pointer into a buffer
98 int rfd; // read/write file descriptors
102 int remain; // number of bytes remaining to write
106 if( (rfd = open( old, O_RDONLY )) < 0 ) {
107 logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
111 len = sizeof( char ) * (strlen( new ) + 2 ); // space needed for temp file name with added .
112 tfname = (char *) malloc( len );
113 wbuf = strdup( new ); // we need to trash the string, so copy
114 tok = strrchr( wbuf, '/' ); // find end of path
118 snprintf( tfname, len, "%s/.%s", wbuf, tok ); // insert . to "hide" from collector
120 snprintf( tfname, len, ".%s", wbuf ); // no path, just add leading .
123 //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
125 if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
126 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s", tfname, strerror( errno ) );
130 while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
133 while( remain > 0 ) {
135 if( (len = write( wfd, &buf[start], len )) != remain ) { // short write
136 if( errno != EINTR && errno != EAGAIN ) {
137 logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
145 remain -= len; // recompute what we need to write, and try again
150 state = close( wfd );
154 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
157 chmod( tfname, mode );
159 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
160 if( (state = rename( tfname, new )) < 0 ) {
161 logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
163 if( unlink( old ) < 0 ) {
164 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
170 return state < 0 ? -1 : 0;
174 Attempt to rename (move) the old file to the new file. If that fails with
175 the error EXDEV (not on same device), then we will copy the file. All
176 other errors returned by the rename() command are considered fatal and
177 copy is not attempted. Returns 0 on success, -1 with errno set on
178 failure. See man page for rename for possible errno values and meanings.
180 static int mvocp( char* old, char* new ) {
182 chmod( old, 0644 ); // give it proper mode before moving it
184 if( rename( old, new ) < 0 ) {
185 if( errno != EXDEV ) {
186 logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
190 return copy_unlink( old, new, 0644 );
197 Opens a new file for writing. Returns the fd. The context fname field will
198 in the context point to the basename (no suffix) on return.
200 static int rdc_open( void* vctx ) {
204 time_t ts; // time stamp, rounded to previous frequency
207 ctx = (rdc_ctx_t *) vctx;
215 ts = ts - (ts % ctx->frequency); // round to previous frequency
216 ctx->next_roll = ts + ctx->frequency; // set next time to roll the file
218 snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts ); // basename needed to build final file name at close
219 snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
220 fd = open( fname, O_WRONLY | O_CREAT, 0200 ); // open in w-- mode so that it should not be readable
222 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
223 return fd; // leave errno set by open attempt
226 lseek( fd, 0, SEEK_END ); // if file existed, continue appending to it
227 logit( LOG_INFO, "(rdf) now writing to: %s", fname );
228 ctx->openname = strdup( fname );
229 ctx->basename = strdup( basename );
235 // ------------------ public things -------------------------------------------------------
238 A generic log function such that if pressed to write logs using the log
239 library which yields completely unreadable json messages, we can comply
240 yet still have readable messages when testing. If the message type LOG_STAT
241 then the message is written to stdout rather than stderr.
243 extern void logit( int priority, char *fmt, ... ) {
245 char ubuf[4*1024+1]; // build user message here
246 char* pstr = "UNK"; // priority string
247 FILE* dest = stderr; // where to write the message
272 va_start( argp, fmt ); // point at first variable arguement
273 vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp ); // build the user portion of the message
276 fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
281 Initialise the raw data collection facility. The two directories, staging and final, may be the
282 same, and if different they _should_ reside on the same filesystem such that a move
283 (rename) operation is only the change in the directory and does not involve the copy
286 Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
287 we assume that once the file is moved from staging to final, we create an empty
288 "done" file using the dsuffix.
290 A pointer to the context is returned; nil on error with errno set to some useful
293 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
295 char* ep; // pointer at environment var value
297 ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
302 memset( ctx, 0, sizeof( *ctx ) );
306 errno = EINVAL; // must have at least one of these
311 ctx->sdir = strdup( fdir );
312 ctx->fdir = strdup( fdir );
314 ctx->sdir = strdup( sdir );
316 ctx->fdir = strdup( sdir );
318 ctx->fdir = strdup( fdir );
324 ctx->suffix = strdup( suffix );
327 ctx->dsuffix = strdup( dsuffix );
330 if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
331 ctx->source = strdup( ep );
333 ctx->source = strdup( "" );
336 ctx->frequency = 300; // default to 5 min roll
343 Allow the file rotation frequency to be set to something other
344 than the default. A minimum of 10s is enforced, but there is
347 extern void rdc_set_freq( void* vctx, int freq ) {
350 ctx = (rdc_ctx_t *) vctx;
352 if( ctx != NULL && freq >= 10 ) {
353 ctx->frequency = freq;
354 logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
356 logit( LOG_ERR, "(rdc) file roll frequency was not set; ctx was nill or frequency was less than 10s" );
361 Close the currently open file and move it to it's final resting place in fdir.
362 If the done suffix in the context is not nil, then we touch a done file which
363 has the same basename with the done suffix; this is also placed into the final
366 extern void rdc_close( void* vctx ) {
372 ctx = (rdc_ctx_t *) vctx;
373 if( ctx == NULL || ctx->fd < 0 ) {
377 close( ctx->fd ); // future -- check for error
380 t_suffix = ctx->suffix != NULL ? ctx->suffix : "";
381 snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix ); // final target name
382 if( mvocp( ctx->openname, target ) < 0 ) {
383 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
385 logit( LOG_INFO, "capture file closed and moved to: %s", target );
386 if( ctx->dsuffix != NULL ) { // must also create a done file
387 snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
388 if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
390 logit( LOG_INFO, "created done file: %s", target );
392 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
397 free( ctx->basename );
398 free( ctx->openname );
399 ctx->basename = NULL;
400 ctx->openname = NULL;
404 Writes the payload using the previously initialised capture buffer.
405 If it's time to roll the file, or the file isn't opened, the needed housekeeping
408 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
410 char header[100]; // our header
413 cb = (cap_buf_t *) vcb;
414 ctx = (rdc_ctx_t *) vctx;
415 if( ctx == NULL || cb == NULL) {
420 if( time( NULL ) >= ctx->next_roll ) {
421 rdc_close( ctx ); // close up the current one
425 rdc_open( ctx ); // need a file, get it open
428 snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
429 write( ctx->fd, header, 20 );
430 write( ctx->fd, cb->uheader, cb->uhlen );
431 write( ctx->fd, payload, len );
433 return 0; // future -- check and return error
437 Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
438 We save the message type, and will use that and the user header length and payload
439 length on write to create the complete RDC header.
441 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
444 cb = (cap_buf_t *) vcb;
446 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
454 if( uhlen > sizeof( cb->uheader ) ) {
455 uhlen = sizeof( uheader );
457 memcpy( cb->uheader, uheader, uhlen );
465 Run some quick checks which require the directories in /tmp to exist, and some
466 manual verification on the part of the tester.
469 void* ctx; // context
470 void* cb = NULL; // capture buffere
475 ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL ); // does not create done files
476 //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" ); // will create a "done" file
478 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer" );
482 rdc_set_freq( ctx, 60 );
484 logit( LOG_INFO, "<TEST> writing three messages" );
485 for( i = 0; i < 3; i++ ) {
486 uheader = "@MRC==len==*---timestamp---*";
487 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
488 payload = "this is a test and just a test if it were not a test you'd be notified";
489 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
494 logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
496 logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
497 for( i = 0; i < 13; i++ ) {
498 uheader = "@MRC==len==*---timestamp---*";
499 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
500 payload = "this is a test and just a test if it were not a test you'd be notified";
501 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
506 rdc_close( ctx ); // force move of the current file before exit