1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: This module contains library functions which implement the
23 raw data collection (rdc) portion of the listener. The fanout
24 function in the library will call these functions to capture
25 raw messages, with a header, for later "replay" or other
26 analysis. Messages are captured as they are written to the
27 FIFO, with a small header added:
30 Where <mtype> is the message type of the message received and
31 <len> is the length of the data that was written to the FIFO.
35 Author: E. Scott Daniels
51 A capture buffer. The listener writes FIFO output in two stages, thus
52 we provide the ability to initialise a capture with the msg type and
53 the MRL header, then to write the payload using this saved data. The
54 idea is to not require the caller to save the header.
57 char uheader[100]; // user header (max 100 bytes)
58 int uhlen; // length of user header
59 int mtype; // message type
63 int flags; // DFFL_* constatnts
64 int frequency; // the frequency at which files are rolled
65 int fd; // current open write file
66 char* sdir; // staging directory
67 char* fdir; // final directory
68 char* suffix; // suffix for final file (must include . if needed)
69 char* dsuffix; // suffix for done file
70 char* basename; // base name of the file being written to
71 char* openname; // full filename that is open for writing
72 char* source; // added to output file names to differentiate the source
73 time_t next_roll; // time we should roll the file
76 #define RDC_DELIM "@RDC" // delimeter used in our file
78 // -------------------------------------------------------------------------------------------
81 Copy and unlink old file is successful. During writing the file mode will
82 be write only for the owner (0200). If the mode passed in is not 0, then
83 just prior to renaming the file to 'new', the mode will be changed. If
84 mode is 0, then we assume the caller will change the file mode when
87 There seems to be an issue with some collectors and thus it is required
88 to initially name the file with a leading dot (.) until the file is closed
89 and ready to be read by external processes (marking it write-only seems
90 not to discourage them from trying!).
92 static int copy_unlink( char* old, char* new, int mode ) {
93 char buf[8192]; // read buffer
94 char* tfname; // temp file name while we have it open
95 char* wbuf; // work buffer for disecting the new filename
96 char* tok; // token pointer into a buffer
98 int rfd; // read/write file descriptors
102 int remain; // number of bytes remaining to write
106 if( (rfd = open( old, O_RDONLY )) < 0 ) {
107 logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
111 len = sizeof( char ) * (strlen( new ) + 2 ); // space needed for temp file name with added .
112 tfname = (char *) malloc( len );
113 wbuf = strdup( new ); // we need to trash the string, so copy
114 tok = strrchr( wbuf, '/' ); // find end of path
118 snprintf( tfname, len, "%s/.%s", wbuf, tok ); // insert . to "hide" from collector
120 snprintf( tfname, len, ".%s", wbuf ); // no path, just add leading .
122 //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
124 if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
125 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s", tfname, strerror( errno ) );
129 while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
132 while( remain > 0 ) {
134 if( (len = write( wfd, &buf[start], len )) != remain ) { // short write
135 if( errno != EINTR && errno != EAGAIN ) {
136 logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
143 remain -= len; // recompute what we need to write, and try again
148 state = close( wfd );
152 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
155 chmod( tfname, mode );
157 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
158 if( (state = rename( tfname, new )) < 0 ) {
159 logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
161 if( unlink( old ) < 0 ) {
162 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
167 return state < 0 ? -1 : 0;
171 Attempt to rename (move) the old file to the new file. If that fails with
172 the error EXDEV (not on same device), then we will copy the file. All
173 other errors returned by the rename() command are considered fatal and
174 copy is not attempted. Returns 0 on success, -1 with errno set on
175 failure. See man page for rename for possible errno values and meanings.
177 static int mvocp( char* old, char* new ) {
179 chmod( old, 0644 ); // give it proper mode before moving it
181 if( rename( old, new ) < 0 ) {
182 if( errno != EXDEV ) {
183 logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
187 return copy_unlink( old, new, 0644 );
194 Opens a new file for writing. Returns the fd. The context fname field will
195 in the context point to the basename (no suffix) on return.
197 static int rdc_open( void* vctx ) {
201 time_t ts; // time stamp, rounded to previous frequency
204 ctx = (rdc_ctx_t *) vctx;
212 ts = ts - (ts % ctx->frequency); // round to previous frequency
213 ctx->next_roll = ts + ctx->frequency; // set next time to roll the file
215 snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts ); // basename needed to build final file name at close
216 snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
217 fd = open( fname, O_WRONLY | O_CREAT, 0200 ); // open in w-- mode so that it should not be readable
219 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
220 return fd; // leave errno set by open attempt
223 lseek( fd, 0, SEEK_END ); // if file existed, continue appending to it
224 logit( LOG_INFO, "(rdf) now writing to: %s", fname );
225 ctx->openname = strdup( fname );
226 ctx->basename = strdup( basename );
232 // ------------------ public things -------------------------------------------------------
235 A generic log function such that if pressed to write logs using the log
236 library which yields completely unreadable json messages, we can comply
237 yet still have readable messages when testing. If the message type LOG_STAT
238 then the message is written to stdout rather than stderr.
240 extern void logit( int priority, char *fmt, ... ) {
242 char ubuf[4*1024+1]; // build user message here
243 char* pstr = "UNK"; // priority string
244 FILE* dest = stderr; // where to write the message
269 va_start( argp, fmt ); // point at first variable arguement
270 vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp ); // build the user portion of the message
273 fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
278 Initialise the raw data collection facility. The two directories, staging and final, may be the
279 same, and if different they _should_ reside on the same filesystem such that a move
280 (rename) operation is only the change in the directory and does not involve the copy
283 Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
284 we assume that once the file is moved from staging to final, we create an empty
285 "done" file using the dsuffix.
287 A pointer to the context is returned; nil on error with errno set to some useful
290 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
292 char* ep; // pointer at environment var value
294 ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
299 memset( ctx, 0, sizeof( *ctx ) );
303 errno = EINVAL; // must have at least one of these
308 ctx->sdir = strdup( fdir );
309 ctx->fdir = strdup( fdir );
311 ctx->sdir = strdup( sdir );
313 ctx->fdir = strdup( sdir );
315 ctx->fdir = strdup( fdir );
321 ctx->suffix = strdup( suffix );
324 ctx->dsuffix = strdup( dsuffix );
327 if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
328 ctx->source = strdup( ep );
330 ctx->source = strdup( "" );
333 ctx->frequency = 300; // default to 5 min roll
340 Allow the file rotation frequency to be set to something other
341 than the default. A minimum of 10s is enforced, but there is
344 extern void rdc_set_freq( void* vctx, int freq ) {
347 ctx = (rdc_ctx_t *) vctx;
349 if( ctx != NULL && freq >= 10 ) {
350 ctx->frequency = freq;
351 logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
353 logit( LOG_ERR, "(rdc) file roll frequency was not set; ctx was nill or frequency was less than 10s" );
358 Close the currently open file and move it to it's final resting place in fdir.
359 If the done suffix in the context is not nil, then we touch a done file which
360 has the same basename with the done suffix; this is also placed into the final
363 extern void rdc_close( void* vctx ) {
369 ctx = (rdc_ctx_t *) vctx;
370 if( ctx == NULL || ctx->fd < 0 ) {
374 close( ctx->fd ); // future -- check for error
377 t_suffix = ctx->suffix != NULL ? ctx->suffix : "";
378 snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix ); // final target name
379 if( mvocp( ctx->openname, target ) < 0 ) {
380 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
382 logit( LOG_INFO, "capture file closed and moved to: %s", target );
383 if( ctx->dsuffix != NULL ) { // must also create a done file
384 snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
385 if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
387 logit( LOG_INFO, "created done file: %s", target );
389 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
394 free( ctx->basename );
395 free( ctx->openname );
396 ctx->basename = NULL;
397 ctx->openname = NULL;
401 Writes the payload using the previously initialised capture buffer.
402 If it's time to roll the file, or the file isn't opened, the needed housekeeping
405 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
407 char header[100]; // our header
410 cb = (cap_buf_t *) vcb;
411 ctx = (rdc_ctx_t *) vctx;
412 if( ctx == NULL || cb == NULL) {
417 if( time( NULL ) >= ctx->next_roll ) {
418 rdc_close( ctx ); // close up the current one
422 rdc_open( ctx ); // need a file, get it open
425 snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
426 write( ctx->fd, header, 20 );
427 write( ctx->fd, cb->uheader, cb->uhlen );
428 write( ctx->fd, payload, len );
430 return 0; // future -- check and return error
434 Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
435 We save the message type, and will use that and the user header length and payload
436 length on write to create the complete RDC header.
438 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
441 cb = (cap_buf_t *) vcb;
443 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
451 if( uhlen > sizeof( cb->uheader ) ) {
452 uhlen = sizeof( uheader );
454 memcpy( cb->uheader, uheader, uhlen );
462 Run some quick checks which require the directories in /tmp to exist, and some
463 manual verification on the part of the tester.
466 void* ctx; // context
467 void* cb = NULL; // capture buffere
472 ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL ); // does not create done files
473 //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" ); // will create a "done" file
475 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer" );
479 rdc_set_freq( ctx, 60 );
481 logit( LOG_INFO, "<TEST> writing three messages" );
482 for( i = 0; i < 3; i++ ) {
483 uheader = "@MRC==len==*---timestamp---*";
484 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
485 payload = "this is a test and just a test if it were not a test you'd be notified";
486 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
491 logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
493 logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
494 for( i = 0; i < 13; i++ ) {
495 uheader = "@MRC==len==*---timestamp---*";
496 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
497 payload = "this is a test and just a test if it were not a test you'd be notified";
498 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
503 rdc_close( ctx ); // force move of the current file before exit