Correct bug in listener not detecting eagain/eintr correctly
[ric-app/mc.git] / sidecars / listener / src / mcl.c
index 0077e96..60346ba 100644 (file)
@@ -100,7 +100,7 @@ static void* setup_rdc() {
        char*   done = NULL;
 
        if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) {                                    // exists and is 0
-               logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
+               logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)" );
                return NULL;
        }
 
@@ -148,7 +148,7 @@ static void* setup_rdc() {
                <delim><len><timestamp>
 
        Field lengths (bytes) are:
-               delim           4     
+               delim           4    
                len                     8       (7 digits + 0)
                timestamp       16  (15 digits + 0)
 
@@ -275,6 +275,10 @@ static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
                                fifo = NULL;
                        }
                }
+       } else {
+               if( fifo->fd < 0 ) {                            // it existed, but was closed; reopen
+                       fifo->fd = open_fifo( ctx, mtype, io_dir );
+               }
        }
 
        if( fref != NULL ) {
@@ -284,6 +288,32 @@ static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
        return fifo == NULL ? -1 : fifo->fd;
 }
 
+/*
+       Should we need to close a FIFO we do so and leave the block in the hash
+       with a bad FD so that we'll attempt to reopen on next use.
+*/
+static void close_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
+       fifo_t* fifo;
+       void*   hash;
+
+       if( ctx == NULL ) {
+               return;
+       }
+
+       if( io_dir == READER ) {                // with an integer key, we need two hash tables
+               hash = ctx->rd_hash;
+       } else {
+               hash = ctx->wr_hash;
+       }
+
+       if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) != NULL ) {
+               if( fifo->fd >= 0 ) {
+                       close( fifo->fd );
+                       fifo->fd = -1;
+               }
+       }
+}
+
 /*
        Make marking counts easier in code
 */
@@ -334,6 +364,69 @@ static void wr_stats( void* st, void* entry, char const* name, void* thing, void
        }
 }
 
+/*
+       Writes the indicated bytes (n2write) from buf onto the fd. Returns only after
+       full buffer is written, or there is a hard error (not eagain or eintr).
+       Returns the number written; if less than n2write the caller may assume
+       that there was a hard error and errno should reflect.
+*/
+static inline int write_all( int fd, char const* buf, int n2write ) {
+       ssize_t remain = 0;                     // number of bytes remaining to write
+       ssize_t wrote = 0;                      // number of bytes written thus far
+       ssize_t state = 0;
+
+       if( fd < 0 ) {
+               errno = EBADFD;
+               return 0;
+       }
+
+       errno = 0;
+       remain = n2write;
+       do {
+               if( (state = write( fd, buf + wrote, remain )) > 0 ) {
+                       wrote += state;
+                       remain = n2write - wrote;
+               }
+       } while( remain > 0 && (errno == EINTR || errno == EAGAIN) ) ;
+
+       return wrote;
+}
+
+/*
+       Similar to write_all, this will write all bytes in the buffer, but
+       will return failure if the first write attempt fails with 0 written
+       (assuming that the pipe has no reader). We use this when writing the
+       header bytes; we want to drop the message if we can't even write one
+       byte, but if we write one, we must loop until all are written.
+
+       Returns the number written. If that value is less than n2write, then
+       the caller may assume a hard error occurred and errno should reflect.
+       If 0 is returned it can be assumed that the FIFO would block/has no
+       reader.
+*/
+static inline int write_all_nb( int fd, char const* buf, int n2write ) {
+       ssize_t remain = 0;                     // number of bytes remaining to write
+       ssize_t wrote = 0;                      // number of bytes written
+
+       if( fd < 0 ) {
+               errno = EBADFD;
+               return 0;
+       }
+
+       errno = 0;
+       remain = n2write;
+       wrote = write( fd, buf, remain );
+       if( wrote < 0 ) {                                                               // report error with exception for broken pipe
+               return errno == EPIPE ? 0 : -1;                         // broken pipe we assume no reader and return 0 since nothing written
+       }
+
+       if( wrote < n2write  &&  wrote > 0 ) {                  // if we wrote anything, we must tough it out and write all if it was short
+               wrote +=  write_all( fd, buf + wrote, n2write - wrote );
+       }
+
+       return wrote;
+}
+
 // ---------- public ------------------------------------------------------
 /*
        Sets a signal handler for sigpipe so we don't crash if a reader closes the
@@ -601,6 +694,7 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
        time_t          next_report = 0;                // we'll report every 2 seconds if report is true
        time_t          now;
        size_t          hwlen;                                  // write len for header
+       size_t          wrote;                                  // number of bytes actually written
        void*           rdc_ctx = NULL;                 // raw data capture context
        void*           rdc_buf = NULL;                 // capture buffer
 
@@ -640,25 +734,32 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
                                        hwlen = MCL_LEN_SIZE;
                                }
 
-                               fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
+                               fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );                                      // map the message type to an open fd
                                if( fd >= 0 ) {
-                                       if( write( fd, header, hwlen ) != hwlen ) {                     // write exactly MCL_LEN_SIZE bytes from the buffer
+                                       if( (wrote = write_all_nb( fd, header, hwlen )) == 0 ) {                // write header; 0 indicates no reader, drop silently
                                                drops++;
                                                total_drops++;
                                                chalk_error( fifo );
                                        } else {
-                                               if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
+                                               if( wrote != hwlen ) {
+                                                       logit( LOG_ERR, "(mcl): error writing header to fifo; mt=%d wrote=%d tried=%d: %s", mbuf->mtype, wrote, hwlen, strerror( errno ) );
                                                        errors++;
                                                        chalk_error( fifo );
+                                                       close_fifo( ctx, mbuf->mtype, WRITER );
                                                } else {
-                                                       chalk_ok( fifo );
-                                                       count++;
-                                                       total++;
+                                                       if( write_all( fd, mbuf->payload, mbuf->len ) != mbuf->len ) {          // we wrote a header, so we must write all; no drop at this point
+                                                               logit( LOG_ERR, "(mcl): error writing payload to fifo; mt=%d: %s\n", mbuf->mtype, strerror( errno ) );
+                                                               close_fifo( ctx, mbuf->mtype, WRITER );
+                                                       } else {
+                                                               chalk_ok( fifo );
+                                                               count++;
+                                                               total++;
+                                                       }
                                                }
                                        }
                                }
 
-                               if( rdc_ctx != NULL ) {
+                               if( rdc_ctx != NULL ) {                                         // always put the message to the rdc files if collecting; eve if pipe write failed
                                        rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
                                        rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                // write the raw data
                                }