X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fsidecars%2Flistener%2Frdc.c;fp=src%2Fsidecars%2Flistener%2Frdc.c;h=bf7300e08cf48237fcfdb8df7eb91d252f7b4cdd;hb=53a0c6d44d66556623c6aee61eb7b6de9c4fd41b;hp=0000000000000000000000000000000000000000;hpb=9c6bfeaa83dfcae7e2ed6998f8a645ea49a47bcb;p=ric-app%2Fmc.git diff --git a/src/sidecars/listener/rdc.c b/src/sidecars/listener/rdc.c new file mode 100644 index 0000000..bf7300e --- /dev/null +++ b/src/sidecars/listener/rdc.c @@ -0,0 +1,504 @@ +// vim: ts=4 sw=4 noet: +/* +-------------------------------------------------------------------------------- + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +-------------------------------------------------------------------------------- +*/ + +/* + Mnemonic: rdc.c. + Abstract: This module contains library functions which implement the + raw data collection (rdc) portion of the listener. The fanout + function in the library will call these functions to capture + raw messages, with a header, for later "replay" or other + analysis. Messages are captured as they are written to the + FIFO, with a small header added: + @RDC + + Where is the message type of the message received and + is the length of the data that was written to the FIFO. + + + Date: 06 Oct 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mcl.h" + +/* + A capture buffer. The listener writes FIFO output in two stages, thus + we provide the ability to initialise a capture with the msg type and + the MRL header, then to write the payload using this saved data. The + idea is to not require the caller to save the header. +*/ +typedef struct { + char uheader[100]; // user header (max 100 bytes) + int uhlen; // length of user header + int mtype; // message type +} cap_buf_t; + +typedef struct { + int flags; // DFFL_* constatnts + int frequency; // the frequency at which files are rolled + int fd; // current open write file + char* sdir; // staging directory + char* fdir; // final directory + char* suffix; // suffix for final file (must include . if needed) + char* dsuffix; // suffix for done file + char* basename; // base name of the file being written to + char* openname; // full filename that is open for writing + char* source; // added to output file names to differentiate the source + time_t next_roll; // time we should roll the file +} rdc_ctx_t; + +#define RDC_DELIM "@RDC" // delimeter used in our file + +// ------------------------------------------------------------------------------------------- + +/* + Copy and unlink old file is successful. During writing the file mode will + be write only for the owner (0200). If the mode passed in is not 0, then + just prior to renaming the file to 'new', the mode will be changed. If + mode is 0, then we assume the caller will change the file mode when + appropriate. + + There seems to be an issue with some collectors and thus it is required + to initially name the file with a leading dot (.) until the file is closed + and ready to be read by external processes (marking it write only seems + not to discourage them from trying!). +*/ +static int copy_unlink( char* old, char* new, int mode ) { + char buf[8192]; // read buffer + char* tfname; // temp file name while we have it open + char* wbuf; // work buffer for disecting the new filename + char* tok; // token pointer into a buffer + int len; + int rfd; // read/write file descriptors + int wfd; + int start; + int state; + int remain; // number of bytes remaining to write + + + errno = 0; + if( (rfd = open( old, O_RDONLY )) < 0 ) { + logit( LOG_ERR, "copy: open src for copy failed: %s: %s\n", old, strerror( errno ) ); + return -1; + } + + len = sizeof( char ) * (strlen( new ) + 2 ); // space needed for temp file name with added . + tfname = (char *) malloc( len ); + wbuf = strdup( new ); // we need to trash the string, so copy + tok = strrchr( wbuf, '/' ); // find end of path + if( tok ) { + *tok = 0; + tok++; + snprintf( tfname, len, "%s/.%s", wbuf, tok ); // insert . to "hide" from collector + } else { + snprintf( tfname, len, ".%s", wbuf ); // no path, just add leading . + } + //logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname ); + + if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) { + logit( LOG_ERR, "copy: open tmp file for copy failed: %s: %s\n", tfname, strerror( errno ) ); + return -1; + } + + while( (len = read( rfd, buf, sizeof( buf ) )) > 0 ) { + remain = len; + start = 0; + while( remain > 0 ) { + errno = 0; + if( (len = write( wfd, &buf[start], len )) != remain ) { // short write + if( errno != EINTR && errno != EAGAIN ) { + logit( LOG_ERR, "copy: write failed: %s", strerror( errno ) ); + close( wfd ); + close( rfd ); + return -1; + } + } + + remain -= len; // recompute what we need to write, and try again + start += len; + } + } + + state = close( wfd ); + close( rfd ); + + if( state < 0 ) { + logit( LOG_ERR, "copy: close reported error: %s", strerror( errno ) ); + } else { + if( mode != 0 ) { + chmod( tfname, mode ); + } + //logit( LOG_INFO, "copy: moving tmp file to: %s", new ); + if( (state = rename( tfname, new )) < 0 ) { + logit( LOG_WARN, "copy: rename of tmp to final name failed for %s -> %s: %s", tfname, new, strerror( errno ) ); + } else { + if( unlink( old ) < 0 ) { + logit( LOG_WARN, "copy: unlink failed for %s: %s", old, strerror( errno ) ); + } + } + } + + return state < 0 ? -1 : 0; +} + +/* + Attempt to rename (move) the old file to the new file. If that fails with + the error EXDEV (not on same device), then we will copy the file. All + other errors returned by the rename() command are considered fatal and + copy is not attempted. Returns 0 on success, -1 with errno set on + failure. See man page for rename for possible errno values and meanings. +*/ +static int mvocp( char* old, char* new ) { + + chmod( old, 0644 ); // give it proper mode before moving it + + if( rename( old, new ) < 0 ) { + if( errno != EXDEV ) { + logit( LOG_ERR, "mvocp: cannot move %s to %s: %s", old, new, strerror( errno ) ); + return -1; + } + + return copy_unlink( old, new, 0644 ); + } + + return 0; +} + +/* + Opens a new file for writing. Returns the fd. The context fname field will + in the context point to the basename (no suffix) on return. +*/ +static int rdc_open( void* vctx ) { + char fname[2048]; + char basename[2048]; + int fd; + time_t ts; // time stamp, rounded to previous frequency + rdc_ctx_t* ctx; + + ctx = (rdc_ctx_t *) vctx; + + + if( ctx == NULL ) { + return -1; + } + + ts = time( NULL ); + ts = ts - (ts % ctx->frequency); // round to previous frequency + ctx->next_roll = ts + ctx->frequency; // set next time to roll the file + + snprintf( basename, sizeof( fname ), "MCLT%s_%ld", ctx->source, (long) ts ); // basename needed to build final file name at close + snprintf( fname, sizeof( fname ), "%s/MCLT_%ld", ctx->sdir, (long) ts ); + fd = open( fname, O_WRONLY | O_CREAT, 0200 ); // open in w-- mode so that it should not be readable + if( fd < 0 ) { + logit( LOG_CRIT, "(rdf) cannot open data capture file: %s: %s", fname, strerror( errno ) ); + return fd; // leave errno set by open attempt + } + + lseek( fd, 0, SEEK_END ); // if file existed, continue appending to it + logit( LOG_INFO, "(rdf) now writing to: %s", fname ); + ctx->openname = strdup( fname ); + ctx->basename = strdup( basename ); + ctx->fd = fd; + + return fd; +} + +// ------------------ public things ------------------------------------------------------- + +/* + A generic log function such that if pressed to write logs using the log + library which yields completely unreadable json messages, we can comply + yet still have readable messages when testing. If the message type LOG_STAT + then the message is written to stdout rather than stderr. +*/ +extern void logit( int priority, char *fmt, ... ) { + va_list argp; + char ubuf[4*1024+1]; // build user message here + char* pstr = "UNK"; // priority string + FILE* dest = stderr; // where to write the message + + switch( priority ) { + case LOG_STAT: + pstr = "STAT"; + dest = stdout; + break; + + case LOG_CRIT: + pstr = "CRI"; + break; + + case LOG_ERR: + pstr = "ERR"; + break; + + case LOG_WARN: + pstr = "WARN"; + break; + + default: + pstr = "INFO"; + break; + } + + va_start( argp, fmt ); // point at first variable arguement + vsnprintf( ubuf, sizeof( ubuf ) -1, fmt, argp ); // build the user portion of the message + va_end( argp ); + + fprintf( dest, "%ld [%s] %s\n", time( NULL ), pstr, ubuf ); +} + + +/* + Initialise the raw data collection facility. The two directories, staging and final, may be the + same, and if different they _should_ reside on the same filesystem such that a move + (rename) operation is only the change in the directory and does not involve the copy + of bytes. + + Either suffix may be a nil pointer. If the done suffix (dsuffix) is not nil, then + we assume that once the file is moved from staging to final, we create an empty + "done" file using the dsuffix. + + A pointer to the context is returned; nil on error with errno set to some useful + (we hope) value. +*/ +extern void* rdc_init( char* sdir, char* fdir, char* suffix, char* dsuffix ) { + rdc_ctx_t* ctx; + char* ep; // pointer at environment var value + + ctx = (rdc_ctx_t *) malloc( sizeof( *ctx ) ); + if( ctx == NULL ) { + errno = ENOMEM; + return NULL; + } + memset( ctx, 0, sizeof( *ctx ) ); + + if( sdir == NULL ) { + if( fdir == NULL ) { + errno = EINVAL; // must have at least one of these + free( ctx ); + return NULL; + } + + ctx->sdir = strdup( fdir ); + ctx->fdir = strdup( fdir ); + } else { + ctx->sdir = strdup( sdir ); + if( fdir == NULL ) { + ctx->fdir = strdup( sdir ); + } else { + ctx->fdir = strdup( fdir ); + } + + } + + if( suffix ) { + ctx->suffix = strdup( suffix ); + } + if( dsuffix ) { + ctx->dsuffix = strdup( dsuffix ); + } + + if( (ep = getenv( "MCL_RDC_SOURCE")) != NULL ) { + ctx->source = strdup( ep ); + } else { + ctx->source = strdup( "" ); + } + + ctx->frequency = 300; // default to 5 min roll + ctx->fd = -1; + + return (void *) ctx; +} + +/* + Allow the file rotation frequency to be set to something other + than the default. A minimum of 10s is enforced, but there is + no maximum. +*/ +extern void rdc_set_freq( void* vctx, int freq ) { + rdc_ctx_t* ctx; + + ctx = (rdc_ctx_t *) vctx; + + if( ctx != NULL && freq > 10 ) { + ctx->frequency = freq; + } + + logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency ); +} + +/* + Close the currently open file and move it to it's final resting place in fdir. + If the done suffix in the context is not nil, then we touch a done file which + has the same basename with the done suffix; this is also placed into the final + dir. +*/ +extern void rdc_close( void* vctx ) { + char target[2048]; + char* t_suffix; + int fd; + rdc_ctx_t* ctx; + + ctx = (rdc_ctx_t *) vctx; + if( ctx == NULL || ctx->fd < 0 ) { + return; + } + + close( ctx->fd ); // future -- check for error + ctx->fd = -1; + + t_suffix = ctx->suffix != NULL ? ctx->suffix : ""; + snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, t_suffix ); // final target name + if( mvocp( ctx->openname, target ) < 0 ) { + logit( LOG_ERR, "(rdf) unable to move file '%s' to '%s': %s", ctx->openname, target, strerror( errno ) ); + } else { + logit( LOG_INFO, "capture file closed and moved to: %s", target ); + if( ctx->dsuffix != NULL ) { // must also create a done file + snprintf( target, sizeof( target ), "%s/%s%s", ctx->fdir, ctx->basename, ctx->dsuffix ); + if( (fd = open( target, O_CREAT, 0664 )) >= 0 ) { + close( fd ); + logit( LOG_INFO, "created done file: %s", target ); + } else { + logit( LOG_ERR, "unable to create done file: %s", target, strerror( errno ) ); + } + } + } + + free( ctx->basename ); + free( ctx->openname ); + ctx->basename = NULL; + ctx->openname = NULL; +} + +/* + Writes the payload using the previously initialised capture buffer. + If it's time to roll the file, or the file isn't opened, the needed housekeeping + is done first. +*/ +extern int rdc_write( void* vctx, void* vcb, char* payload, int len ) { + cap_buf_t* cb; + char header[100]; // our header + rdc_ctx_t* ctx; + + cb = (cap_buf_t *) vcb; + ctx = (rdc_ctx_t *) vctx; + if( ctx == NULL || cb == NULL) { + errno = EINVAL; + return -1; + } + + if( time( NULL ) >= ctx->next_roll ) { + rdc_close( ctx ); // close up the current one + } + + if( ctx->fd < 0 ) { + rdc_open( ctx ); // need a file, get it open + } + + snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len ); + write( ctx->fd, header, 20 ); + write( ctx->fd, cb->uheader, cb->uhlen ); + write( ctx->fd, payload, len ); + + return 0; // future -- check and return error +} + +/* + Initialise a capture buffer; The caller can pass in an old cb and we will reuse it. + We save the message type, and will use that and the user header length and payload + length on write to create the complete RDC header. +*/ +extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) { + cap_buf_t* cb; + + cb = (cap_buf_t *) vcb; + if( cb == NULL ) { + cb = (cap_buf_t *) malloc( sizeof( *cb ) ); + if( cb == NULL ) { + errno = ENOMEM; + return NULL; + } + } + + cb->mtype = mtype; + if( uhlen > sizeof( cb->uheader ) ) { + uhlen = sizeof( uheader ); + } + memcpy( cb->uheader, uheader, uhlen ); + cb->uhlen = uhlen; + + return (void *) cb; +} + +#ifdef SELF_TEST +/* + Run some quick checks which require the directories in /tmp to exist, and some + manual verification on the part of the tester. +*/ +int main( ) { + void* ctx; // context + void* cb = NULL; // capture buffere + char* uheader; + char* payload; + int i; + + ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL ); // does not create done files + //ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" ); // will create a "done" file + if( ctx == NULL ) { + logit( LOG_CRIT, " abort! rdc_init returned a nil pointer\n" ); + exit( 1 ); + } + + rdc_set_freq( ctx, 60 ); + + logit( LOG_INFO, " writing three messages" ); + for( i = 0; i < 3; i++ ) { + uheader = "@MRC==len==*---timestamp---*"; + cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb ); + payload = "this is a test and just a test if it were not a test you'd be notified"; + rdc_write( ctx, cb, payload, strlen( payload ) +1 ); + + sleep( 1 ); + } + + logit( LOG_INFO, " sleeping 80s to force a file change" ); + sleep( 80 ); + logit( LOG_INFO, " sleep finished, writing thirteen messages" ); + for( i = 0; i < 13; i++ ) { + uheader = "@MRC==len==*---timestamp---*"; + cb = rdc_init_buf( ctx, 220 + i, uheader, strlen( uheader ), cb ); + payload = "this is a test and just a test if it were not a test you'd be notified"; + rdc_write( ctx, cb, payload, strlen( payload ) +1 ); + + sleep( 1 ); + } + + rdc_close( ctx ); // force move of the current file before exit +} +#endif