Add full unit tests for listener
[ric-app/mc.git] / sidecars / listener / rdc.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       rdc.c.
22         Abstract:       This module contains library functions which implement the
23                                 raw data collection (rdc) portion of the listener. The fanout
24                                 function in the library will call these functions to capture
25                                 raw messages, with a header, for later "replay" or other
26                                 analysis.  Messages are captured as they are written to the
27                                 FIFO, with a small header added:
28                                         @RDC<mtype><len>
29
30                                 Where <mtype> is the message type of the message received and
31                                 <len> is the length of the data that was written to the FIFO.
32
33                                 
34         Date:           06 Oct 2019
35         Author:         E. Scott Daniels
36 */
37
38 #include <stdarg.h>
39 #include <errno.h> 
40 #include <stdio.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <string.h>
46 #include <sys/stat.h>
47
48 #include "mcl.h"
49
50 /*
51         A capture buffer. The listener writes FIFO output in two stages, thus
52         we provide the ability to initialise a capture with the msg type and 
53         the MRL header, then to write the payload using this saved data. The
54         idea is to not require the caller to save the header.
55 */
56 typedef struct {
57         char    uheader[100];   // user header (max 100 bytes)
58         int             uhlen;                  // length of user header
59         int             mtype;                  // message type
60 } cap_buf_t;
61
62 typedef struct {
63         int             flags;                  // DFFL_* constatnts
64         int             frequency;              // the frequency at which files are rolled
65         int             fd;                             // current open write file
66         char*   sdir;                   // staging directory
67         char*   fdir;                   // final directory
68         char*   suffix;                 // suffix for final file (must include . if needed)
69         char*   dsuffix;                // suffix for done file
70         char*   basename;               // base name of the file being written to
71         char*   openname;               // full filename that is open for writing
72         char*   source;                 // added to output file names to differentiate the source
73         time_t  next_roll;              // time we should roll the file
74 } rdc_ctx_t;
75
76 #define RDC_DELIM       "@RDC"          // delimeter used in our file
77
78 // -------------------------------------------------------------------------------------------
79
80 /*
81         Copy and unlink old file is successful. During writing the file mode will
82         be write only for the owner (0200). If the mode passed in is not 0, then
83         just prior to renaming the file to 'new', the mode will be changed. If
84         mode is 0, then we assume the caller will change the file mode when 
85         appropriate.
86
87         There seems to be an issue with some collectors and thus it is required 
88         to initially name the file with a leading dot (.) until the file is closed
89         and ready to be read by external processes (marking it write-only seems
90         not to discourage them from trying!).
91 */
92 static int copy_unlink( char* old, char* new, int mode ) {
93         char    buf[8192];      // read buffer
94         char*   tfname;         // temp file name while we have it open
95         char*   wbuf;           // work buffer for disecting the new filename
96         char*   tok;            // token pointer into a buffer
97         int     len;
98         int     rfd;            // read/write file descriptors
99         int     wfd;
100         int start;
101         int     state;
102         int remain;             // number of bytes remaining to write
103
104         
105         errno = 0;
106         if( (rfd = open( old, O_RDONLY )) < 0 ) {
107                 logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
108                 return -1;
109         }
110
111         len = sizeof( char ) * (strlen( new ) + 2 );                    // space needed for temp file name with added .
112         tfname = (char *) malloc( len );
113         wbuf = strdup( new );                                                                                                   // we need to trash the string, so copy
114         tok = strrchr( wbuf, '/' );                                                                                             // find end of path
115         if( tok ) {
116                 *tok = 0;
117                 tok++;
118                 snprintf( tfname, len, "%s/.%s", wbuf, tok );                                                   // insert . to "hide" from collector
119         } else {
120                 snprintf( tfname, len, ".%s", wbuf );                                                                   // no path, just add leading .
121         }
122         //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
123
124         if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
125                 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s", tfname, strerror( errno ) );
126                 return -1;
127         }
128
129         while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
130                 remain = len;
131                 start = 0;
132                 while( remain > 0 ) {
133                         errno = 0;
134                         if( (len = write( wfd, &buf[start], len )) != remain ) {                // short write
135                                 if( errno != EINTR && errno != EAGAIN ) {
136                                         logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
137                                         close( wfd );
138                                         close( rfd );
139                                         return -1;
140                                 }
141                         }
142
143                         remain -= len;          // recompute what we need to write, and try again
144                         start += len;
145                 }
146         }
147
148         state = close( wfd );
149         close( rfd );
150
151         if( state < 0 ) {
152                 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
153         } else {
154                 if( mode != 0 ) {
155                         chmod( tfname, mode );
156                 }
157                 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
158                 if( (state = rename( tfname, new )) < 0 ) {
159                         logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
160                 } else {
161                         if( unlink( old ) < 0 ) {
162                                 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
163                         }
164                 }
165         }
166
167         return state < 0 ? -1 : 0;
168 }
169
170 /*
171         Attempt to rename (move) the old file to the new file. If that fails with
172         the error EXDEV (not on same device), then we will copy the file. All
173         other errors returned by the rename() command are considered fatal and
174         copy is not attempted.  Returns 0 on success, -1 with errno set on 
175         failure. See man page for rename for possible errno values and meanings.
176 */
177 static int mvocp( char* old, char* new ) {
178
179         chmod( old, 0644 );                                                             // give it proper mode before moving it
180
181         if( rename( old, new ) < 0 ) {
182                 if( errno != EXDEV ) {
183                         logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
184                         return -1;
185                 }
186
187                 return copy_unlink( old, new, 0644 );
188         }
189         
190         return 0;
191 }
192
193 /*
194         Opens a new file for writing. Returns the fd. The context fname field will
195         in the context point to the basename (no suffix) on return.
196 */
197 static int rdc_open( void* vctx ) {
198         char    fname[2048];
199         char    basename[2048];
200         int             fd;
201         time_t  ts;                             // time stamp, rounded to previous frequency
202         rdc_ctx_t* ctx;
203
204         ctx = (rdc_ctx_t *) vctx;
205         
206         
207         if( ctx == NULL ) {
208                 return -1;
209         }
210
211         ts = time( NULL );
212         ts = ts - (ts % ctx->frequency);                        // round to previous frequency
213         ctx->next_roll = ts + ctx->frequency;           // set next time to roll the file
214
215         snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts );            // basename needed to build final file name at close
216         snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
217         fd = open( fname, O_WRONLY | O_CREAT, 0200 );           // open in w-- mode so that it should not be readable
218         if( fd < 0 ) {
219                 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
220                 return fd;                                                                              // leave errno set by open attempt
221         }
222
223         lseek( fd, 0, SEEK_END );                                                       // if file existed, continue appending to it
224         logit( LOG_INFO, "(rdf) now writing to: %s", fname );
225         ctx->openname = strdup( fname );
226         ctx->basename = strdup( basename );
227         ctx->fd = fd;
228
229         return fd;
230 }
231
232 // ------------------ public things -------------------------------------------------------
233
234 /*
235         A generic log function such that if pressed to write logs using the log
236         library which yields completely unreadable json messages, we can comply
237         yet still have readable messages when testing. If the message type LOG_STAT
238         then the message is written to stdout rather than stderr.
239 */
240 extern void logit( int priority, char *fmt, ... ) {
241     va_list argp;
242     char ubuf[4*1024+1];                                // build user message here
243     char* pstr = "UNK";                                 // priority string
244         FILE*   dest = stderr;                          // where to write the message
245
246         switch( priority ) {
247                 case LOG_STAT:
248                         pstr = "STAT";
249                         dest = stdout;
250                         break;
251
252                 case LOG_CRIT:
253                         pstr = "CRI";
254                         break;
255
256                 case LOG_ERR:
257                         pstr = "ERR";
258                         break;
259
260                 case LOG_WARN:
261                         pstr = "WARN";
262                         break;
263
264                 default:
265                         pstr = "INFO";
266                         break;
267         }
268
269     va_start( argp, fmt );                                                              // point at first variable arguement
270     vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp );    // build the user portion of the message
271     va_end( argp );
272
273     fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
274 }
275
276
277 /*
278         Initialise the raw data collection facility.  The two directories, staging and final, may be the 
279         same, and if different they _should_ reside on the same filesystem such that a move
280         (rename) operation is only the change in the directory and does not involve the copy
281         of bytes.
282
283         Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
284         we assume that once the file is moved from staging to final, we create an empty
285         "done" file using the dsuffix.
286
287         A pointer to the context is returned; nil on error with errno set to some useful
288         (we hope) value.
289 */
290 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
291         rdc_ctx_t*      ctx;
292         char*           ep;                     // pointer at environment var value
293
294         ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
295         if( ctx == NULL ) {
296                 errno = ENOMEM;
297                 return NULL;
298         }
299         memset( ctx, 0, sizeof( *ctx ) );
300
301         if( sdir == NULL ) {
302                 if( fdir == NULL ) {
303                         errno = EINVAL;         // must have at least one of these
304                         free( ctx );
305                         return NULL;
306                 }
307
308                 ctx->sdir = strdup( fdir );
309                 ctx->fdir = strdup( fdir );
310         } else {
311                 ctx->sdir = strdup( sdir );
312                 if( fdir == NULL ) {
313                         ctx->fdir = strdup( sdir );
314                 } else {
315                         ctx->fdir = strdup( fdir );
316                 }
317                 
318         }
319
320         if( suffix ) {
321                 ctx->suffix = strdup( suffix );
322         }
323         if( dsuffix ) {
324                 ctx->dsuffix = strdup( dsuffix );
325         }
326
327         if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
328                 ctx->source = strdup( ep );
329         } else {
330                 ctx->source = strdup( "" );
331         }
332
333         ctx->frequency = 300;                   // default to 5 min roll
334         ctx->fd = -1;
335
336         return (void *) ctx;
337 }
338
339 /*
340         Allow the file rotation frequency to be set to something other
341         than the default.  A minimum of 10s is enforced, but there is 
342         no maximum.
343 */
344 extern void rdc_set_freq( void* vctx, int freq ) {
345         rdc_ctx_t* ctx;
346
347         ctx = (rdc_ctx_t *) vctx;
348         
349         if( ctx != NULL && freq >= 10 ) {
350                 ctx->frequency = freq;
351                 logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
352         } else {
353                 logit( LOG_ERR, "(rdc) file roll frequency was not set; ctx was nill or frequency was less than 10s" );
354         }
355 }
356
357 /*
358         Close the currently open file and move it to it's final resting place in fdir.
359         If the done suffix in the context is not nil, then we touch a done file which
360         has the same basename with the done suffix; this is also placed into the final
361         dir.
362 */
363 extern void rdc_close( void* vctx ) {
364         char    target[2048];
365         char*   t_suffix;
366         int             fd;
367         rdc_ctx_t* ctx;
368
369         ctx = (rdc_ctx_t *) vctx;
370         if( ctx == NULL || ctx->fd < 0 ) {
371                 return;
372         }
373
374         close( ctx->fd );               // future -- check for error
375         ctx->fd = -1;
376
377         t_suffix =  ctx->suffix != NULL  ? ctx->suffix : "";
378         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix );            // final target name
379         if( mvocp( ctx->openname, target ) < 0 ) {
380                 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
381         } else {
382                 logit( LOG_INFO, "capture file closed and moved to: %s", target );
383                 if( ctx->dsuffix != NULL ) {                            // must also create a done file
384                         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
385                         if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
386                                 close( fd );
387                                 logit( LOG_INFO, "created done file: %s", target );
388                         } else {
389                                 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
390                         }
391                 }
392         }
393
394         free( ctx->basename );
395         free( ctx->openname );
396         ctx->basename = NULL;
397         ctx->openname = NULL;
398 }
399
400 /*
401         Writes the payload using the previously initialised capture buffer.
402         If it's time to roll the file, or the file isn't opened, the needed housekeeping
403         is done first.
404 */
405 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
406         cap_buf_t* cb;
407         char    header[100];                                    // our header
408         rdc_ctx_t* ctx;
409
410         cb = (cap_buf_t *) vcb;
411         ctx = (rdc_ctx_t *) vctx;
412         if( ctx == NULL || cb == NULL) {
413                 errno = EINVAL;
414                 return -1;
415         }
416
417         if( time( NULL ) >= ctx->next_roll ) {
418                 rdc_close( ctx );                                       // close up the current one
419         }
420
421         if( ctx->fd < 0 ) {
422                 rdc_open( ctx );                                        // need a file, get it open
423         }       
424
425         snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
426         write( ctx->fd, header, 20 );
427         write( ctx->fd, cb->uheader, cb->uhlen );
428         write( ctx->fd, payload, len );
429
430         return 0;               // future -- check and return error
431 }
432
433 /*
434         Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
435         We save the message type, and will use that and the user header length and payload
436         length on write to create the complete RDC header.
437 */
438 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) { 
439         cap_buf_t* cb;
440
441         cb = (cap_buf_t *) vcb;
442         if( cb == NULL ) {
443                 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
444                 if( cb == NULL ) {
445                         errno = ENOMEM;
446                         return NULL;
447                 }
448         }       
449
450         cb->mtype = mtype;
451         if(  uhlen > sizeof( cb->uheader ) ) {
452                 uhlen = sizeof( uheader );
453         }
454         memcpy( cb->uheader, uheader, uhlen );
455         cb->uhlen = uhlen;
456
457         return (void *) cb;
458 }
459
460 #ifdef SELF_TEST
461 /*
462         Run some quick checks which require the directories in /tmp to exist, and some 
463         manual verification on the part of the tester.
464 */
465 int main( ) {
466         void*   ctx;                    // context
467         void*   cb = NULL;              // capture buffere
468         char*   uheader;
469         char*   payload;
470         int     i;
471
472         ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL );                     // does not create done files
473         //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" );        // will create a "done" file
474         if( ctx == NULL ) {
475                 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer" );
476                 exit( 1 );
477         }
478
479         rdc_set_freq( ctx, 60 );
480
481         logit( LOG_INFO, "<TEST> writing three messages" );
482         for( i = 0; i < 3; i++ ) {
483                 uheader = "@MRC==len==*---timestamp---*";
484                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
485                 payload = "this is a test and just a test if it were not a test you'd be notified";
486                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
487
488                 sleep( 1 );
489         }
490
491         logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
492         sleep( 80 );
493         logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
494         for( i = 0; i < 13; i++ ) {
495                 uheader = "@MRC==len==*---timestamp---*";
496                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
497                 payload = "this is a test and just a test if it were not a test you'd be notified";
498                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
499
500                 sleep( 1 );
501         }
502
503         rdc_close( ctx );                       // force move of the current file before exit
504 }
505 #endif