Reorganise the sidecars
[ric-app/mc.git] / sidecars / listener / rdc.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       rdc.c.
22         Abstract:       This module contains library functions which implement the
23                                 raw data collection (rdc) portion of the listener. The fanout
24                                 function in the library will call these functions to capture
25                                 raw messages, with a header, for later "replay" or other
26                                 analysis.  Messages are captured as they are written to the
27                                 FIFO, with a small header added:
28                                         @RDC<mtype><len>
29
30                                 Where <mtype> is the message type of the message received and
31                                 <len> is the length of the data that was written to the FIFO.
32
33                                 
34         Date:           06 Oct 2019
35         Author:         E. Scott Daniels
36 */
37
38 #include <stdarg.h>
39 #include <errno.h> 
40 #include <stdio.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <string.h>
46 #include <sys/stat.h>
47
48 #include "mcl.h"
49
50 /*
51         A capture buffer. The listener writes FIFO output in two stages, thus
52         we provide the ability to initialise a capture with the msg type and 
53         the MRL header, then to write the payload using this saved data. The
54         idea is to not require the caller to save the header.
55 */
56 typedef struct {
57         char    uheader[100];   // user header (max 100 bytes)
58         int             uhlen;                  // length of user header
59         int             mtype;                  // message type
60 } cap_buf_t;
61
62 typedef struct {
63         int             flags;                  // DFFL_* constatnts
64         int             frequency;              // the frequency at which files are rolled
65         int             fd;                             // current open write file
66         char*   sdir;                   // staging directory
67         char*   fdir;                   // final directory
68         char*   suffix;                 // suffix for final file (must include . if needed)
69         char*   dsuffix;                // suffix for done file
70         char*   basename;               // base name of the file being written to
71         char*   openname;               // full filename that is open for writing
72         char*   source;                 // added to output file names to differentiate the source
73         time_t  next_roll;              // time we should roll the file
74 } rdc_ctx_t;
75
76 #define RDC_DELIM       "@RDC"          // delimeter used in our file
77
78 // -------------------------------------------------------------------------------------------
79
80 /*
81         Copy and unlink old file is successful. During writing the file mode will
82         be write only for the owner (0200). If the mode passed in is not 0, then
83         just prior to renaming the file to 'new', the mode will be changed. If
84         mode is 0, then we assume the caller will change the file mode when 
85         appropriate.
86
87         There seems to be an issue with some collectors and thus it is required 
88         to initially name the file with a leading dot (.) until the file is closed
89         and ready to be read by external processes (marking it write only seems
90         not to discourage them from trying!).
91 */
92 static int copy_unlink( char* old, char* new, int mode ) {
93         char    buf[8192];      // read buffer
94         char*   tfname;         // temp file name while we have it open
95         char*   wbuf;           // work buffer for disecting the new filename
96         char*   tok;            // token pointer into a buffer
97         int     len;
98         int     rfd;            // read/write file descriptors
99         int     wfd;
100         int start;
101         int     state;
102         int remain;             // number of bytes remaining to write
103
104         
105         errno = 0;
106         if( (rfd = open( old, O_RDONLY )) < 0 ) {
107                 logit( LOG_ERR, "copy: open src for copy failed: %s: %s\n", old, strerror( errno ) );
108                 return -1;
109         }
110
111         len = sizeof( char ) * (strlen( new ) + 2 );                    // space needed for temp file name with added .
112         tfname = (char *) malloc( len );
113         wbuf = strdup( new );                                                                                                   // we need to trash the string, so copy
114         tok = strrchr( wbuf, '/' );                                                                                             // find end of path
115         if( tok ) {
116                 *tok = 0;
117                 tok++;
118                 snprintf( tfname, len, "%s/.%s", wbuf, tok );                                                   // insert . to "hide" from collector
119         } else {
120                 snprintf( tfname, len, ".%s", wbuf );                                                                   // no path, just add leading .
121         }
122         //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
123
124         if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
125                 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s\n", tfname, strerror( errno ) );
126                 return -1;
127         }
128
129         while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
130                 remain = len;
131                 start = 0;
132                 while( remain > 0 ) {
133                         errno = 0;
134                         if( (len = write( wfd, &buf[start], len )) != remain ) {                // short write
135                                 if( errno != EINTR && errno != EAGAIN ) {
136                                         logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
137                                         close( wfd );
138                                         close( rfd );
139                                         return -1;
140                                 }
141                         }
142
143                         remain -= len;          // recompute what we need to write, and try again
144                         start += len;
145                 }
146         }
147
148         state = close( wfd );
149         close( rfd );
150
151         if( state < 0 ) {
152                 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
153         } else {
154                 if( mode != 0 ) {
155                         chmod( tfname, mode );
156                 }
157                 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
158                 if( (state = rename( tfname, new )) < 0 ) {
159                         logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
160                 } else {
161                         if( unlink( old ) < 0 ) {
162                                 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
163                         }
164                 }
165         }
166
167         return state < 0 ? -1 : 0;
168 }
169
170 /*
171         Attempt to rename (move) the old file to the new file. If that fails with
172         the error EXDEV (not on same device), then we will copy the file. All
173         other errors returned by the rename() command are considered fatal and
174         copy is not attempted.  Returns 0 on success, -1 with errno set on 
175         failure. See man page for rename for possible errno values and meanings.
176 */
177 static int mvocp( char* old, char* new ) {
178
179         chmod( old, 0644 );                                                             // give it proper mode before moving it
180
181         if( rename( old, new ) < 0 ) {
182                 if( errno != EXDEV ) {
183                         logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
184                         return -1;
185                 }
186
187                 return copy_unlink( old, new, 0644 );
188         }
189         
190         return 0;
191 }
192
193 /*
194         Opens a new file for writing. Returns the fd. The context fname field will
195         in the context point to the basename (no suffix) on return.
196 */
197 static int rdc_open( void* vctx ) {
198         char    fname[2048];
199         char    basename[2048];
200         int             fd;
201         time_t  ts;                             // time stamp, rounded to previous frequency
202         rdc_ctx_t* ctx;
203
204         ctx = (rdc_ctx_t *) vctx;
205         
206         
207         if( ctx == NULL ) {
208                 return -1;
209         }
210
211         ts = time( NULL );
212         ts = ts - (ts % ctx->frequency);                        // round to previous frequency
213         ctx->next_roll = ts + ctx->frequency;           // set next time to roll the file
214
215         snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts );            // basename needed to build final file name at close
216         snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
217         fd = open( fname, O_WRONLY | O_CREAT, 0200 );           // open in w-- mode so that it should not be readable
218         if( fd < 0 ) {
219                 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
220                 return fd;                                                                              // leave errno set by open attempt
221         }
222
223         lseek( fd, 0, SEEK_END );                                                       // if file existed, continue appending to it
224         logit( LOG_INFO, "(rdf) now writing to: %s", fname );
225         ctx->openname = strdup( fname );
226         ctx->basename = strdup( basename );
227         ctx->fd = fd;
228
229         return fd;
230 }
231
232 // ------------------ public things -------------------------------------------------------
233
234 /*
235         A generic log function such that if pressed to write logs using the log
236         library which yields completely unreadable json messages, we can comply
237         yet still have readable messages when testing. If the message type LOG_STAT
238         then the message is written to stdout rather than stderr.
239 */
240 extern void logit( int priority, char *fmt, ... ) {
241     va_list argp;
242     char ubuf[4*1024+1];                                // build user message here
243     char* pstr = "UNK";                                 // priority string
244         FILE*   dest = stderr;                          // where to write the message
245
246         switch( priority ) {
247                 case LOG_STAT:
248                         pstr = "STAT";
249                         dest = stdout;
250                         break;
251
252                 case LOG_CRIT:
253                         pstr = "CRI";
254                         break;
255
256                 case LOG_ERR:
257                         pstr = "ERR";
258                         break;
259
260                 case LOG_WARN:
261                         pstr = "WARN";
262                         break;
263
264                 default:
265                         pstr = "INFO";
266                         break;
267         }
268
269     va_start( argp, fmt );                                                              // point at first variable arguement
270     vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp );    // build the user portion of the message
271     va_end( argp );
272
273     fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
274 }
275
276
277 /*
278         Initialise the raw data collection facility.  The two directories, staging and final, may be the 
279         same, and if different they _should_ reside on the same filesystem such that a move
280         (rename) operation is only the change in the directory and does not involve the copy
281         of bytes.
282
283         Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
284         we assume that once the file is moved from staging to final, we create an empty
285         "done" file using the dsuffix.
286
287         A pointer to the context is returned; nil on error with errno set to some useful
288         (we hope) value.
289 */
290 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
291         rdc_ctx_t*      ctx;
292         char*           ep;                     // pointer at environment var value
293
294         ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
295         if( ctx == NULL ) {
296                 errno = ENOMEM;
297                 return NULL;
298         }
299         memset( ctx, 0, sizeof( *ctx ) );
300
301         if( sdir == NULL ) {
302                 if( fdir == NULL ) {
303                         errno = EINVAL;         // must have at least one of these
304                         free( ctx );
305                         return NULL;
306                 }
307
308                 ctx->sdir = strdup( fdir );
309                 ctx->fdir = strdup( fdir );
310         } else {
311                 ctx->sdir = strdup( sdir );
312                 if( fdir == NULL ) {
313                         ctx->fdir = strdup( sdir );
314                 } else {
315                         ctx->fdir = strdup( fdir );
316                 }
317                 
318         }
319
320         if( suffix ) {
321                 ctx->suffix = strdup( suffix );
322         }
323         if( dsuffix ) {
324                 ctx->dsuffix = strdup( dsuffix );
325         }
326
327         if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
328                 ctx->source = strdup( ep );
329         } else {
330                 ctx->source = strdup( "" );
331         }
332
333         ctx->frequency = 300;                   // default to 5 min roll
334         ctx->fd = -1;
335
336         return (void *) ctx;
337 }
338
339 /*
340         Allow the file rotation frequency to be set to something other
341         than the default.  A minimum of 10s is enforced, but there is 
342         no maximum.
343 */
344 extern void rdc_set_freq( void* vctx, int freq ) {
345         rdc_ctx_t* ctx;
346
347         ctx = (rdc_ctx_t *) vctx;
348         
349         if( ctx != NULL && freq > 10 ) {
350                 ctx->frequency = freq;
351         }
352
353         logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
354 }
355
356 /*
357         Close the currently open file and move it to it's final resting place in fdir.
358         If the done suffix in the context is not nil, then we touch a done file which
359         has the same basename with the done suffix; this is also placed into the final
360         dir.
361 */
362 extern void rdc_close( void* vctx ) {
363         char    target[2048];
364         char*   t_suffix;
365         int             fd;
366         rdc_ctx_t* ctx;
367
368         ctx = (rdc_ctx_t *) vctx;
369         if( ctx == NULL || ctx->fd < 0 ) {
370                 return;
371         }
372
373         close( ctx->fd );               // future -- check for error
374         ctx->fd = -1;
375
376         t_suffix =  ctx->suffix != NULL  ? ctx->suffix : "";
377         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix );            // final target name
378         if( mvocp( ctx->openname, target ) < 0 ) {
379                 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
380         } else {
381                 logit( LOG_INFO, "capture file closed and moved to: %s", target );
382                 if( ctx->dsuffix != NULL ) {                            // must also create a done file
383                         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
384                         if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
385                                 close( fd );
386                                 logit( LOG_INFO, "created done file: %s", target );
387                         } else {
388                                 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
389                         }
390                 }
391         }
392
393         free( ctx->basename );
394         free( ctx->openname );
395         ctx->basename = NULL;
396         ctx->openname = NULL;
397 }
398
399 /*
400         Writes the payload using the previously initialised capture buffer.
401         If it's time to roll the file, or the file isn't opened, the needed housekeeping
402         is done first.
403 */
404 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
405         cap_buf_t* cb;
406         char    header[100];                                    // our header
407         rdc_ctx_t* ctx;
408
409         cb = (cap_buf_t *) vcb;
410         ctx = (rdc_ctx_t *) vctx;
411         if( ctx == NULL || cb == NULL) {
412                 errno = EINVAL;
413                 return -1;
414         }
415
416         if( time( NULL ) >= ctx->next_roll ) {
417                 rdc_close( ctx );                                       // close up the current one
418         }
419
420         if( ctx->fd < 0 ) {
421                 rdc_open( ctx );                                        // need a file, get it open
422         }       
423
424         snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
425         write( ctx->fd, header, 20 );
426         write( ctx->fd, cb->uheader, cb->uhlen );
427         write( ctx->fd, payload, len );
428
429         return 0;               // future -- check and return error
430 }
431
432 /*
433         Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
434         We save the message type, and will use that and the user header length and payload
435         length on write to create the complete RDC header.
436 */
437 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) { 
438         cap_buf_t* cb;
439
440         cb = (cap_buf_t *) vcb;
441         if( cb == NULL ) {
442                 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
443                 if( cb == NULL ) {
444                         errno = ENOMEM;
445                         return NULL;
446                 }
447         }       
448
449         cb->mtype = mtype;
450         if(  uhlen > sizeof( cb->uheader ) ) {
451                 uhlen = sizeof( uheader );
452         }
453         memcpy( cb->uheader, uheader, uhlen );
454         cb->uhlen = uhlen;
455
456         return (void *) cb;
457 }
458
459 #ifdef SELF_TEST
460 /*
461         Run some quick checks which require the directories in /tmp to exist, and some 
462         manual verification on the part of the tester.
463 */
464 int main( ) {
465         void*   ctx;                    // context
466         void*   cb = NULL;              // capture buffere
467         char*   uheader;
468         char*   payload;
469         int     i;
470
471         ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL );                     // does not create done files
472         //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" );        // will create a "done" file
473         if( ctx == NULL ) {
474                 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer\n" );
475                 exit( 1 );
476         }
477
478         rdc_set_freq( ctx, 60 );
479
480         logit( LOG_INFO, "<TEST> writing three messages" );
481         for( i = 0; i < 3; i++ ) {
482                 uheader = "@MRC==len==*---timestamp---*";
483                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
484                 payload = "this is a test and just a test if it were not a test you'd be notified";
485                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
486
487                 sleep( 1 );
488         }
489
490         logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
491         sleep( 80 );
492         logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
493         for( i = 0; i < 13; i++ ) {
494                 uheader = "@MRC==len==*---timestamp---*";
495                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
496                 payload = "this is a test and just a test if it were not a test you'd be notified";
497                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
498
499                 sleep( 1 );
500         }
501
502         rdc_close( ctx );                       // force move of the current file before exit
503 }
504 #endif