049508ca6331678af66e85ed59b6408551af46d2
[ric-app/mc.git] / sidecars / listener / src / rdc.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       rdc.c.
22         Abstract:       This module contains library functions which implement the
23                                 raw data collection (rdc) portion of the listener. The fanout
24                                 function in the library will call these functions to capture
25                                 raw messages, with a header, for later "replay" or other
26                                 analysis.  Messages are captured as they are written to the
27                                 FIFO, with a small header added:
28                                         @RDC<mtype><len>
29
30                                 Where <mtype> is the message type of the message received and
31                                 <len> is the length of the data that was written to the FIFO.
32
33                 
34         Date:           06 Oct 2019
35         Author:         E. Scott Daniels
36 */
37
38 #include <stdarg.h>
39 #include <errno.h>
40 #include <stdio.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <string.h>
46 #include <sys/stat.h>
47
48 #include "mcl.h"
49
50 /*
51         A capture buffer. The listener writes FIFO output in two stages, thus
52         we provide the ability to initialise a capture with the msg type and
53         the MRL header, then to write the payload using this saved data. The
54         idea is to not require the caller to save the header.
55 */
56 typedef struct {
57         char    uheader[100];   // user header (max 100 bytes)
58         int             uhlen;                  // length of user header
59         int             mtype;                  // message type
60 } cap_buf_t;
61
62 typedef struct {
63         int             flags;                  // DFFL_* constatnts
64         int             frequency;              // the frequency at which files are rolled
65         int             fd;                             // current open write file
66         char*   sdir;                   // staging directory
67         char*   fdir;                   // final directory
68         char*   suffix;                 // suffix for final file (must include . if needed)
69         char*   dsuffix;                // suffix for done file
70         char*   basename;               // base name of the file being written to
71         char*   openname;               // full filename that is open for writing
72         char*   source;                 // added to output file names to differentiate the source
73         time_t  next_roll;              // time we should roll the file
74 } rdc_ctx_t;
75
76 #define RDC_DELIM       "@RDC"          // delimeter used in our file
77
78 // -------------------------------------------------------------------------------------------
79
80 /*
81         Copy and unlink old file is successful. During writing the file mode will
82         be write only for the owner (0200). If the mode passed in is not 0, then
83         just prior to renaming the file to 'new', the mode will be changed. If
84         mode is 0, then we assume the caller will change the file mode when
85         appropriate.
86
87         There seems to be an issue with some collectors and thus it is required
88         to initially name the file with a leading dot (.) until the file is closed
89         and ready to be read by external processes (marking it write-only seems
90         not to discourage them from trying!).
91 */
92 static int copy_unlink( char* old, char* new, int mode ) {
93         char    buf[8192];                      // read buffer
94         char*   tfname = NULL;          // temp file name while we have it open
95         char*   wbuf;                           // work buffer for disecting the new filename
96         char*   tok;                            // token pointer into a buffer
97         int     len;
98         int     rfd;            // read/write file descriptors
99         int     wfd;
100         int start;
101         int     state;
102         int remain;             // number of bytes remaining to write
103
104
105         errno = 0;
106         if( (rfd = open( old, O_RDONLY )) < 0 ) {
107                 logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
108                 return -1;
109         }
110
111         len = sizeof( char ) * (strlen( new ) + 2 );                    // space needed for temp file name with added .
112         tfname = (char *) malloc( len );
113         wbuf = strdup( new );                                                                                                   // we need to trash the string, so copy
114         tok = strrchr( wbuf, '/' );                                                                                             // find end of path
115         if( tok ) {
116                 *tok = 0;
117                 tok++;
118                 snprintf( tfname, len, "%s/.%s", wbuf, tok );                                                   // insert . to "hide" from collector
119         } else {
120                 snprintf( tfname, len, ".%s", wbuf );                                                                   // no path, just add leading .
121         }
122         free( wbuf );
123         //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
124
125         if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
126                 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s", tfname, strerror( errno ) );
127                 return -1;
128         }
129
130         while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
131                 remain = len;
132                 start = 0;
133                 while( remain > 0 ) {
134                         errno = 0;
135                         if( (len = write( wfd, &buf[start], len )) != remain ) {                // short write
136                                 if( errno != EINTR && errno != EAGAIN ) {
137                                         logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
138                                         free( tfname );
139                                         close( wfd );
140                                         close( rfd );
141                                         return -1;
142                                 }
143                         }
144
145                         remain -= len;          // recompute what we need to write, and try again
146                         start += len;
147                 }
148         }
149
150         state = close( wfd );
151         close( rfd );
152
153         if( state < 0 ) {
154                 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
155         } else {
156                 if( mode != 0 ) {
157                         chmod( tfname, mode );
158                 }
159                 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
160                 if( (state = rename( tfname, new )) < 0 ) {
161                         logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
162                 } else {
163                         if( unlink( old ) < 0 ) {
164                                 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
165                         }
166                 }
167         }
168
169         free( tfname );
170         return state < 0 ? -1 : 0;
171 }
172
173 /*
174         Attempt to rename (move) the old file to the new file. If that fails with
175         the error EXDEV (not on same device), then we will copy the file. All
176         other errors returned by the rename() command are considered fatal and
177         copy is not attempted.  Returns 0 on success, -1 with errno set on
178         failure. See man page for rename for possible errno values and meanings.
179 */
180 static int mvocp( char* old, char* new ) {
181
182         chmod( old, 0644 );                                                             // give it proper mode before moving it
183
184         if( rename( old, new ) < 0 ) {
185                 if( errno != EXDEV ) {
186                         logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
187                         return -1;
188                 }
189
190                 return copy_unlink( old, new, 0644 );
191         }
192
193         return 0;
194 }
195
196 /*
197         Opens a new file for writing. Returns the fd. The context fname field will
198         in the context point to the basename (no suffix) on return.
199 */
200 static int rdc_open( void* vctx ) {
201         char    fname[2048];
202         char    basename[2048];
203         int             fd;
204         time_t  ts;                             // time stamp, rounded to previous frequency
205         rdc_ctx_t* ctx;
206
207         ctx = (rdc_ctx_t *) vctx;
208
209
210         if( ctx == NULL ) {
211                 return -1;
212         }
213
214         ts = time( NULL );
215         ts = ts - (ts % ctx->frequency);                        // round to previous frequency
216         ctx->next_roll = ts + ctx->frequency;           // set next time to roll the file
217
218         snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts );            // basename needed to build final file name at close
219         snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
220         fd = open( fname, O_WRONLY | O_CREAT, 0200 );           // open in w-- mode so that it should not be readable
221         if( fd < 0 ) {
222                 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
223                 return fd;                                                                              // leave errno set by open attempt
224         }
225
226         lseek( fd, 0, SEEK_END );                                                       // if file existed, continue appending to it
227         logit( LOG_INFO, "(rdf) now writing to: %s", fname );
228         ctx->openname = strdup( fname );
229         ctx->basename = strdup( basename );
230         ctx->fd = fd;
231
232         return fd;
233 }
234
235 // ------------------ public things -------------------------------------------------------
236
237 /*
238         A generic log function such that if pressed to write logs using the log
239         library which yields completely unreadable json messages, we can comply
240         yet still have readable messages when testing. If the message type LOG_STAT
241         then the message is written to stdout rather than stderr.
242 */
243 extern void logit( int priority, char *fmt, ... ) {
244     va_list argp;
245     char ubuf[4*1024+1];                                // build user message here
246     char* pstr = "UNK";                                 // priority string
247         FILE*   dest = stderr;                          // where to write the message
248
249         switch( priority ) {
250                 case LOG_STAT:
251                         pstr = "STAT";
252                         dest = stdout;
253                         break;
254
255                 case LOG_CRIT:
256                         pstr = "CRI";
257                         break;
258
259                 case LOG_ERR:
260                         pstr = "ERR";
261                         break;
262
263                 case LOG_WARN:
264                         pstr = "WARN";
265                         break;
266
267                 default:
268                         pstr = "INFO";
269                         break;
270         }
271
272     va_start( argp, fmt );                                                              // point at first variable arguement
273     vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp );    // build the user portion of the message
274     va_end( argp );
275
276     fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
277 }
278
279
280 /*
281         Initialise the raw data collection facility.  The two directories, staging and final, may be the
282         same, and if different they _should_ reside on the same filesystem such that a move
283         (rename) operation is only the change in the directory and does not involve the copy
284         of bytes.
285
286         Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
287         we assume that once the file is moved from staging to final, we create an empty
288         "done" file using the dsuffix.
289
290         A pointer to the context is returned; nil on error with errno set to some useful
291         (we hope) value.
292 */
293 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
294         rdc_ctx_t*      ctx;
295         char*           ep;                     // pointer at environment var value
296
297         ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
298         if( ctx == NULL ) {
299                 errno = ENOMEM;
300                 return NULL;
301         }
302         memset( ctx, 0, sizeof( *ctx ) );
303
304         if( sdir == NULL ) {
305                 if( fdir == NULL ) {
306                         errno = EINVAL;         // must have at least one of these
307                         free( ctx );
308                         return NULL;
309                 }
310
311                 ctx->sdir = strdup( fdir );
312                 ctx->fdir = strdup( fdir );
313         } else {
314                 ctx->sdir = strdup( sdir );
315                 if( fdir == NULL ) {
316                         ctx->fdir = strdup( sdir );
317                 } else {
318                         ctx->fdir = strdup( fdir );
319                 }
320
321         }
322
323         if( suffix ) {
324                 ctx->suffix = strdup( suffix );
325         }
326         if( dsuffix ) {
327                 ctx->dsuffix = strdup( dsuffix );
328         }
329
330         if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
331                 ctx->source = strdup( ep );
332         } else {
333                 ctx->source = strdup( "" );
334         }
335
336         ctx->frequency = 300;                   // default to 5 min roll
337         ctx->fd = -1;
338
339         return (void *) ctx;
340 }
341
342 /*
343         Allow the file rotation frequency to be set to something other
344         than the default.  A minimum of 10s is enforced, but there is
345         no maximum.
346 */
347 extern void rdc_set_freq( void* vctx, int freq ) {
348         rdc_ctx_t* ctx;
349
350         ctx = (rdc_ctx_t *) vctx;
351
352         if( ctx != NULL && freq >= 10 ) {
353                 ctx->frequency = freq;
354                 logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
355         } else {
356                 logit( LOG_ERR, "(rdc) file roll frequency was not set; ctx was nill or frequency was less than 10s" );
357         }
358 }
359
360 /*
361         Close the currently open file and move it to it's final resting place in fdir.
362         If the done suffix in the context is not nil, then we touch a done file which
363         has the same basename with the done suffix; this is also placed into the final
364         dir.
365 */
366 extern void rdc_close( void* vctx ) {
367         char    target[2048];
368         char*   t_suffix;
369         int             fd;
370         rdc_ctx_t* ctx;
371
372         ctx = (rdc_ctx_t *) vctx;
373         if( ctx == NULL || ctx->fd < 0 ) {
374                 return;
375         }
376
377         close( ctx->fd );               // future -- check for error
378         ctx->fd = -1;
379
380         t_suffix =  ctx->suffix != NULL  ? ctx->suffix : "";
381         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix );            // final target name
382         if( mvocp( ctx->openname, target ) < 0 ) {
383                 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
384         } else {
385                 logit( LOG_INFO, "capture file closed and moved to: %s", target );
386                 if( ctx->dsuffix != NULL ) {                            // must also create a done file
387                         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
388                         if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
389                                 close( fd );
390                                 logit( LOG_INFO, "created done file: %s", target );
391                         } else {
392                                 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
393                         }
394                 }
395         }
396
397         free( ctx->basename );
398         free( ctx->openname );
399         ctx->basename = NULL;
400         ctx->openname = NULL;
401 }
402
403 /*
404         Writes the payload using the previously initialised capture buffer.
405         If it's time to roll the file, or the file isn't opened, the needed housekeeping
406         is done first.
407 */
408 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
409         cap_buf_t* cb;
410         char    header[100];                                    // our header
411         rdc_ctx_t* ctx;
412
413         cb = (cap_buf_t *) vcb;
414         ctx = (rdc_ctx_t *) vctx;
415         if( ctx == NULL || cb == NULL) {
416                 errno = EINVAL;
417                 return -1;
418         }
419
420         if( time( NULL ) >= ctx->next_roll ) {
421                 rdc_close( ctx );                                       // close up the current one
422         }
423
424         if( ctx->fd < 0 ) {
425                 rdc_open( ctx );                                        // need a file, get it open
426         }
427
428         snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
429         write( ctx->fd, header, 20 );
430         write( ctx->fd, cb->uheader, cb->uhlen );
431         write( ctx->fd, payload, len );
432
433         return 0;               // future -- check and return error
434 }
435
436 /*
437         Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
438         We save the message type, and will use that and the user header length and payload
439         length on write to create the complete RDC header.
440 */
441 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
442         cap_buf_t* cb;
443
444         cb = (cap_buf_t *) vcb;
445         if( cb == NULL ) {
446                 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
447                 if( cb == NULL ) {
448                         errno = ENOMEM;
449                         return NULL;
450                 }
451         }
452
453         cb->mtype = mtype;
454         if(  uhlen > sizeof( cb->uheader ) ) {
455                 uhlen = sizeof( uheader );
456         }
457         memcpy( cb->uheader, uheader, uhlen );
458         cb->uhlen = uhlen;
459
460         return (void *) cb;
461 }
462
463 #ifdef SELF_TEST
464 /*
465         Run some quick checks which require the directories in /tmp to exist, and some
466         manual verification on the part of the tester.
467 */
468 int main( ) {
469         void*   ctx;                    // context
470         void*   cb = NULL;              // capture buffere
471         char*   uheader;
472         char*   payload;
473         int             i;
474
475         ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL );                     // does not create done files
476         //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" );        // will create a "done" file
477         if( ctx == NULL ) {
478                 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer" );
479                 exit( 1 );
480         }
481
482         rdc_set_freq( ctx, 60 );
483
484         logit( LOG_INFO, "<TEST> writing three messages" );
485         for( i = 0; i < 3; i++ ) {
486                 uheader = "@MRC==len==*---timestamp---*";
487                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
488                 payload = "this is a test and just a test if it were not a test you'd be notified";
489                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
490
491                 sleep( 1 );
492         }
493
494         logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
495         sleep( 80 );
496         logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
497         for( i = 0; i < 13; i++ ) {
498                 uheader = "@MRC==len==*---timestamp---*";
499                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
500                 payload = "this is a test and just a test if it were not a test you'd be notified";
501                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
502
503                 sleep( 1 );
504         }
505
506         rdc_close( ctx );                       // force move of the current file before exit
507 }
508 #endif