X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sidecars%2Flistener%2Fsrc%2Fmcl.c;fp=sidecars%2Flistener%2Fsrc%2Fmcl.c;h=60346ba749a79e091c4803480832672a0be843d4;hb=71b42c4cc711c1f917c4fa3d180aaa217a1a7196;hp=0077e96d07f284bad8c3d294c555d9ad567fe544;hpb=75de6f142205ac558a6ca1e5c875a674e8973008;p=ric-app%2Fmc.git diff --git a/sidecars/listener/src/mcl.c b/sidecars/listener/src/mcl.c index 0077e96..60346ba 100644 --- a/sidecars/listener/src/mcl.c +++ b/sidecars/listener/src/mcl.c @@ -100,7 +100,7 @@ static void* setup_rdc() { char* done = NULL; if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0 - logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" ); + logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)" ); return NULL; } @@ -148,7 +148,7 @@ static void* setup_rdc() { Field lengths (bytes) are: - delim 4 + delim 4 len 8 (7 digits + 0) timestamp 16 (15 digits + 0) @@ -275,6 +275,10 @@ static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) { fifo = NULL; } } + } else { + if( fifo->fd < 0 ) { // it existed, but was closed; reopen + fifo->fd = open_fifo( ctx, mtype, io_dir ); + } } if( fref != NULL ) { @@ -284,6 +288,32 @@ static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) { return fifo == NULL ? -1 : fifo->fd; } +/* + Should we need to close a FIFO we do so and leave the block in the hash + with a bad FD so that we'll attempt to reopen on next use. +*/ +static void close_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) { + fifo_t* fifo; + void* hash; + + if( ctx == NULL ) { + return; + } + + if( io_dir == READER ) { // with an integer key, we need two hash tables + hash = ctx->rd_hash; + } else { + hash = ctx->wr_hash; + } + + if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) != NULL ) { + if( fifo->fd >= 0 ) { + close( fifo->fd ); + fifo->fd = -1; + } + } +} + /* Make marking counts easier in code */ @@ -334,6 +364,69 @@ static void wr_stats( void* st, void* entry, char const* name, void* thing, void } } +/* + Writes the indicated bytes (n2write) from buf onto the fd. Returns only after + full buffer is written, or there is a hard error (not eagain or eintr). + Returns the number written; if less than n2write the caller may assume + that there was a hard error and errno should reflect. +*/ +static inline int write_all( int fd, char const* buf, int n2write ) { + ssize_t remain = 0; // number of bytes remaining to write + ssize_t wrote = 0; // number of bytes written thus far + ssize_t state = 0; + + if( fd < 0 ) { + errno = EBADFD; + return 0; + } + + errno = 0; + remain = n2write; + do { + if( (state = write( fd, buf + wrote, remain )) > 0 ) { + wrote += state; + remain = n2write - wrote; + } + } while( remain > 0 && (errno == EINTR || errno == EAGAIN) ) ; + + return wrote; +} + +/* + Similar to write_all, this will write all bytes in the buffer, but + will return failure if the first write attempt fails with 0 written + (assuming that the pipe has no reader). We use this when writing the + header bytes; we want to drop the message if we can't even write one + byte, but if we write one, we must loop until all are written. + + Returns the number written. If that value is less than n2write, then + the caller may assume a hard error occurred and errno should reflect. + If 0 is returned it can be assumed that the FIFO would block/has no + reader. +*/ +static inline int write_all_nb( int fd, char const* buf, int n2write ) { + ssize_t remain = 0; // number of bytes remaining to write + ssize_t wrote = 0; // number of bytes written + + if( fd < 0 ) { + errno = EBADFD; + return 0; + } + + errno = 0; + remain = n2write; + wrote = write( fd, buf, remain ); + if( wrote < 0 ) { // report error with exception for broken pipe + return errno == EPIPE ? 0 : -1; // broken pipe we assume no reader and return 0 since nothing written + } + + if( wrote < n2write && wrote > 0 ) { // if we wrote anything, we must tough it out and write all if it was short + wrote += write_all( fd, buf + wrote, n2write - wrote ); + } + + return wrote; +} + // ---------- public ------------------------------------------------------ /* Sets a signal handler for sigpipe so we don't crash if a reader closes the @@ -601,6 +694,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { time_t next_report = 0; // we'll report every 2 seconds if report is true time_t now; size_t hwlen; // write len for header + size_t wrote; // number of bytes actually written void* rdc_ctx = NULL; // raw data capture context void* rdc_buf = NULL; // capture buffer @@ -640,25 +734,32 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) { hwlen = MCL_LEN_SIZE; } - fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd + fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd if( fd >= 0 ) { - if( write( fd, header, hwlen ) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer + if( (wrote = write_all_nb( fd, header, hwlen )) == 0 ) { // write header; 0 indicates no reader, drop silently drops++; total_drops++; chalk_error( fifo ); } else { - if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload + if( wrote != hwlen ) { + logit( LOG_ERR, "(mcl): error writing header to fifo; mt=%d wrote=%d tried=%d: %s", mbuf->mtype, wrote, hwlen, strerror( errno ) ); errors++; chalk_error( fifo ); + close_fifo( ctx, mbuf->mtype, WRITER ); } else { - chalk_ok( fifo ); - count++; - total++; + if( write_all( fd, mbuf->payload, mbuf->len ) != mbuf->len ) { // we wrote a header, so we must write all; no drop at this point + logit( LOG_ERR, "(mcl): error writing payload to fifo; mt=%d: %s\n", mbuf->mtype, strerror( errno ) ); + close_fifo( ctx, mbuf->mtype, WRITER ); + } else { + chalk_ok( fifo ); + count++; + total++; + } } } } - if( rdc_ctx != NULL ) { + if( rdc_ctx != NULL ) { // always put the message to the rdc files if collecting; eve if pipe write failed rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data }