Fix CMake to reference unit test in correct dir
[ric-app/mc.git] / sidecars / listener / src / rdc.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       rdc.c.
22         Abstract:       This module contains library functions which implement the
23                                 raw data collection (rdc) portion of the listener. The fanout
24                                 function in the library will call these functions to capture
25                                 raw messages, with a header, for later "replay" or other
26                                 analysis.  Messages are captured as they are written to the
27                                 FIFO, with a small header added:
28                                         @RDC<mtype><len>
29
30                                 Where <mtype> is the message type of the message received and
31                                 <len> is the length of the data that was written to the FIFO.
32
33                         
34         Date:           06 Oct 2019
35         Author:         E. Scott Daniels
36 */
37
38 #include <stdarg.h>
39 #include <errno.h>
40 #include <stdio.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <string.h>
46 #include <sys/stat.h>
47
48 #include "mcl.h"
49
50 /*
51         A capture buffer. The listener writes FIFO output in two stages, thus
52         we provide the ability to initialise a capture with the msg type and
53         the MRL header, then to write the payload using this saved data. The
54         idea is to not require the caller to save the header.
55 */
56 typedef struct {
57         char    uheader[100];   // user header (max 100 bytes)
58         int             uhlen;                  // length of user header
59         int             mtype;                  // message type
60 } cap_buf_t;
61
62 typedef struct {
63         int             flags;                  // DFFL_* constatnts
64         int             frequency;              // the frequency at which files are rolled
65         int             fd;                             // current open write file
66         char*   sdir;                   // staging directory
67         char*   fdir;                   // final directory
68         char*   suffix;                 // suffix for final file (must include . if needed)
69         char*   dsuffix;                // suffix for done file
70         char*   basename;               // base name of the file being written to
71         char*   openname;               // full filename that is open for writing
72         char*   source;                 // added to output file names to differentiate the source
73         time_t  next_roll;              // time we should roll the file
74 } rdc_ctx_t;
75
76 #define RDC_DELIM       "@RDC"          // delimeter used in our file
77
78 // -------------------------------------------------------------------------------------------
79
80 /*
81         Copy and unlink old file is successful. During writing the file mode will
82         be write only for the owner (0200). If the mode passed in is not 0, then
83         just prior to renaming the file to 'new', the mode will be changed. If
84         mode is 0, then we assume the caller will change the file mode when
85         appropriate.
86
87         There seems to be an issue with some collectors and thus it is required
88         to initially name the file with a leading dot (.) until the file is closed
89         and ready to be read by external processes (marking it write-only seems
90         not to discourage them from trying!).
91 */
92 static int copy_unlink( char* old, char* new, int mode ) {
93         char    buf[8192];      // read buffer
94         char*   tfname;         // temp file name while we have it open
95         char*   wbuf;           // work buffer for disecting the new filename
96         char*   tok;            // token pointer into a buffer
97         int     len;
98         int     rfd;            // read/write file descriptors
99         int     wfd;
100         int start;
101         int     state;
102         int remain;             // number of bytes remaining to write
103
104
105         errno = 0;
106         if( (rfd = open( old, O_RDONLY )) < 0 ) {
107                 logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
108                 return -1;
109         }
110
111         len = sizeof( char ) * (strlen( new ) + 2 );                    // space needed for temp file name with added .
112         tfname = (char *) malloc( len );
113         wbuf = strdup( new );                                                                                                   // we need to trash the string, so copy
114         tok = strrchr( wbuf, '/' );                                                                                             // find end of path
115         if( tok ) {
116                 *tok = 0;
117                 tok++;
118                 snprintf( tfname, len, "%s/.%s", wbuf, tok );                                                   // insert . to "hide" from collector
119         } else {
120                 snprintf( tfname, len, ".%s", wbuf );                                                                   // no path, just add leading .
121         }
122         free( wbuf );
123         //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
124
125         if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
126                 logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s", tfname, strerror( errno ) );
127                 return -1;
128         }
129
130         while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) {
131                 remain = len;
132                 start = 0;
133                 while( remain > 0 ) {
134                         errno = 0;
135                         if( (len = write( wfd, &buf[start], len )) != remain ) {                // short write
136                                 if( errno != EINTR && errno != EAGAIN ) {
137                                         logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) );
138                                         close( wfd );
139                                         close( rfd );
140                                         return -1;
141                                 }
142                         }
143
144                         remain -= len;          // recompute what we need to write, and try again
145                         start += len;
146                 }
147         }
148
149         state = close( wfd );
150         close( rfd );
151
152         if( state < 0 ) {
153                 logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) );
154         } else {
155                 if( mode != 0 ) {
156                         chmod( tfname, mode );
157                 }
158                 //logit( LOG_INFO, "copy: moving tmp file to: %s", new );
159                 if( (state = rename( tfname, new )) < 0 ) {
160                         logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) );
161                 } else {
162                         if( unlink( old ) < 0 ) {
163                                 logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) );
164                         }
165                 }
166         }
167
168         free( tfname );
169         return state < 0 ? -1 : 0;
170 }
171
172 /*
173         Attempt to rename (move) the old file to the new file. If that fails with
174         the error EXDEV (not on same device), then we will copy the file. All
175         other errors returned by the rename() command are considered fatal and
176         copy is not attempted.  Returns 0 on success, -1 with errno set on
177         failure. See man page for rename for possible errno values and meanings.
178 */
179 static int mvocp( char* old, char* new ) {
180
181         chmod( old, 0644 );                                                             // give it proper mode before moving it
182
183         if( rename( old, new ) < 0 ) {
184                 if( errno != EXDEV ) {
185                         logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) );
186                         return -1;
187                 }
188
189                 return copy_unlink( old, new, 0644 );
190         }
191
192         return 0;
193 }
194
195 /*
196         Opens a new file for writing. Returns the fd. The context fname field will
197         in the context point to the basename (no suffix) on return.
198 */
199 static int rdc_open( void* vctx ) {
200         char    fname[2048];
201         char    basename[2048];
202         int             fd;
203         time_t  ts;                             // time stamp, rounded to previous frequency
204         rdc_ctx_t* ctx;
205
206         ctx = (rdc_ctx_t *) vctx;
207
208
209         if( ctx == NULL ) {
210                 return -1;
211         }
212
213         ts = time( NULL );
214         ts = ts - (ts % ctx->frequency);                        // round to previous frequency
215         ctx->next_roll = ts + ctx->frequency;           // set next time to roll the file
216
217         snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts );            // basename needed to build final file name at close
218         snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts );
219         fd = open( fname, O_WRONLY | O_CREAT, 0200 );           // open in w-- mode so that it should not be readable
220         if( fd < 0 ) {
221                 logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) );
222                 return fd;                                                                              // leave errno set by open attempt
223         }
224
225         lseek( fd, 0, SEEK_END );                                                       // if file existed, continue appending to it
226         logit( LOG_INFO, "(rdf) now writing to: %s", fname );
227         ctx->openname = strdup( fname );
228         ctx->basename = strdup( basename );
229         ctx->fd = fd;
230
231         return fd;
232 }
233
234 // ------------------ public things -------------------------------------------------------
235
236 /*
237         A generic log function such that if pressed to write logs using the log
238         library which yields completely unreadable json messages, we can comply
239         yet still have readable messages when testing. If the message type LOG_STAT
240         then the message is written to stdout rather than stderr.
241 */
242 extern void logit( int priority, char *fmt, ... ) {
243     va_list argp;
244     char ubuf[4*1024+1];                                // build user message here
245     char* pstr = "UNK";                                 // priority string
246         FILE*   dest = stderr;                          // where to write the message
247
248         switch( priority ) {
249                 case LOG_STAT:
250                         pstr = "STAT";
251                         dest = stdout;
252                         break;
253
254                 case LOG_CRIT:
255                         pstr = "CRI";
256                         break;
257
258                 case LOG_ERR:
259                         pstr = "ERR";
260                         break;
261
262                 case LOG_WARN:
263                         pstr = "WARN";
264                         break;
265
266                 default:
267                         pstr = "INFO";
268                         break;
269         }
270
271     va_start( argp, fmt );                                                              // point at first variable arguement
272     vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp );    // build the user portion of the message
273     va_end( argp );
274
275     fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf );
276 }
277
278
279 /*
280         Initialise the raw data collection facility.  The two directories, staging and final, may be the
281         same, and if different they _should_ reside on the same filesystem such that a move
282         (rename) operation is only the change in the directory and does not involve the copy
283         of bytes.
284
285         Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then
286         we assume that once the file is moved from staging to final, we create an empty
287         "done" file using the dsuffix.
288
289         A pointer to the context is returned; nil on error with errno set to some useful
290         (we hope) value.
291 */
292 extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) {
293         rdc_ctx_t*      ctx;
294         char*           ep;                     // pointer at environment var value
295
296         ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) );
297         if( ctx == NULL ) {
298                 errno = ENOMEM;
299                 return NULL;
300         }
301         memset( ctx, 0, sizeof( *ctx ) );
302
303         if( sdir == NULL ) {
304                 if( fdir == NULL ) {
305                         errno = EINVAL;         // must have at least one of these
306                         free( ctx );
307                         return NULL;
308                 }
309
310                 ctx->sdir = strdup( fdir );
311                 ctx->fdir = strdup( fdir );
312         } else {
313                 ctx->sdir = strdup( sdir );
314                 if( fdir == NULL ) {
315                         ctx->fdir = strdup( sdir );
316                 } else {
317                         ctx->fdir = strdup( fdir );
318                 }
319         
320         }
321
322         if( suffix ) {
323                 ctx->suffix = strdup( suffix );
324         }
325         if( dsuffix ) {
326                 ctx->dsuffix = strdup( dsuffix );
327         }
328
329         if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) {
330                 ctx->source = strdup( ep );
331         } else {
332                 ctx->source = strdup( "" );
333         }
334
335         ctx->frequency = 300;                   // default to 5 min roll
336         ctx->fd = -1;
337
338         return (void *) ctx;
339 }
340
341 /*
342         Allow the file rotation frequency to be set to something other
343         than the default.  A minimum of 10s is enforced, but there is
344         no maximum.
345 */
346 extern void rdc_set_freq( void* vctx, int freq ) {
347         rdc_ctx_t* ctx;
348
349         ctx = (rdc_ctx_t *) vctx;
350
351         if( ctx != NULL && freq >= 10 ) {
352                 ctx->frequency = freq;
353                 logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
354         } else {
355                 logit( LOG_ERR, "(rdc) file roll frequency was not set; ctx was nill or frequency was less than 10s" );
356         }
357 }
358
359 /*
360         Close the currently open file and move it to it's final resting place in fdir.
361         If the done suffix in the context is not nil, then we touch a done file which
362         has the same basename with the done suffix; this is also placed into the final
363         dir.
364 */
365 extern void rdc_close( void* vctx ) {
366         char    target[2048];
367         char*   t_suffix;
368         int             fd;
369         rdc_ctx_t* ctx;
370
371         ctx = (rdc_ctx_t *) vctx;
372         if( ctx == NULL || ctx->fd < 0 ) {
373                 return;
374         }
375
376         close( ctx->fd );               // future -- check for error
377         ctx->fd = -1;
378
379         t_suffix =  ctx->suffix != NULL  ? ctx->suffix : "";
380         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix );            // final target name
381         if( mvocp( ctx->openname, target ) < 0 ) {
382                 logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) );
383         } else {
384                 logit( LOG_INFO, "capture file closed and moved to: %s", target );
385                 if( ctx->dsuffix != NULL ) {                            // must also create a done file
386                         snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix );
387                         if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) {
388                                 close( fd );
389                                 logit( LOG_INFO, "created done file: %s", target );
390                         } else {
391                                 logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) );
392                         }
393                 }
394         }
395
396         free( ctx->basename );
397         free( ctx->openname );
398         ctx->basename = NULL;
399         ctx->openname = NULL;
400 }
401
402 /*
403         Writes the payload using the previously initialised capture buffer.
404         If it's time to roll the file, or the file isn't opened, the needed housekeeping
405         is done first.
406 */
407 extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) {
408         cap_buf_t* cb;
409         char    header[100];                                    // our header
410         rdc_ctx_t* ctx;
411
412         cb = (cap_buf_t *) vcb;
413         ctx = (rdc_ctx_t *) vctx;
414         if( ctx == NULL || cb == NULL) {
415                 errno = EINVAL;
416                 return -1;
417         }
418
419         if( time( NULL ) >= ctx->next_roll ) {
420                 rdc_close( ctx );                                       // close up the current one
421         }
422
423         if( ctx->fd < 0 ) {
424                 rdc_open( ctx );                                        // need a file, get it open
425         }
426
427         snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
428         write( ctx->fd, header, 20 );
429         write( ctx->fd, cb->uheader, cb->uhlen );
430         write( ctx->fd, payload, len );
431
432         return 0;               // future -- check and return error
433 }
434
435 /*
436         Initialise a capture buffer; The caller can pass in an old cb and we will reuse it.
437         We save the message type, and will use that and the user header length and payload
438         length on write to create the complete RDC header.
439 */
440 extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
441         cap_buf_t* cb;
442
443         cb = (cap_buf_t *) vcb;
444         if( cb == NULL ) {
445                 cb = (cap_buf_t *) malloc( sizeof( *cb ) );
446                 if( cb == NULL ) {
447                         errno = ENOMEM;
448                         return NULL;
449                 }
450         }
451
452         cb->mtype = mtype;
453         if(  uhlen > sizeof( cb->uheader ) ) {
454                 uhlen = sizeof( uheader );
455         }
456         memcpy( cb->uheader, uheader, uhlen );
457         cb->uhlen = uhlen;
458
459         return (void *) cb;
460 }
461
462 #ifdef SELF_TEST
463 /*
464         Run some quick checks which require the directories in /tmp to exist, and some
465         manual verification on the part of the tester.
466 */
467 int main( ) {
468         void*   ctx;                    // context
469         void*   cb = NULL;              // capture buffere
470         char*   uheader;
471         char*   payload;
472         int             i;
473
474         ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL );                     // does not create done files
475         //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" );        // will create a "done" file
476         if( ctx == NULL ) {
477                 logit( LOG_CRIT, "<TEST> abort! rdc_init returned a nil pointer" );
478                 exit( 1 );
479         }
480
481         rdc_set_freq( ctx, 60 );
482
483         logit( LOG_INFO, "<TEST> writing three messages" );
484         for( i = 0; i < 3; i++ ) {
485                 uheader = "@MRC==len==*---timestamp---*";
486                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
487                 payload = "this is a test and just a test if it were not a test you'd be notified";
488                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
489
490                 sleep( 1 );
491         }
492
493         logit( LOG_INFO, "<TEST> sleeping 80s to force a file change" );
494         sleep( 80 );
495         logit( LOG_INFO, "<TEST> sleep finished, writing thirteen messages" );
496         for( i = 0; i < 13; i++ ) {
497                 uheader = "@MRC==len==*---timestamp---*";
498                 cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb );
499                 payload = "this is a test and just a test if it were not a test you'd be notified";
500                 rdc_write( ctx, cb, payload, strlen( payload ) +1 );
501
502                 sleep( 1 );
503         }
504
505         rdc_close( ctx );                       // force move of the current file before exit
506 }
507 #endif