Cleanup listener based on sonar grumblings
[ric-app/mc.git] / sidecars / listener / mcl.c
index 00aee96..a50924b 100644 (file)
@@ -99,11 +99,9 @@ static void* setup_rdc() {
        char*   suffix = ".rdc";
        char*   done = NULL;
 
-       if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
-               if( ep != NULL  && atoi( ep ) == 0 ) {
-                       logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
-                       return NULL;
-               }
+       if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) {                                    // exists and is 0
+               logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
+               return NULL;
        }
 
        if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
@@ -139,7 +137,7 @@ static void* setup_rdc() {
        if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
                value = atoi( ep );
                logit( LOG_INFO, "setting frequency: %d", value );
-               rdc_set_freq( ctx, value );     
+               rdc_set_freq( ctx, value );
        }
        return ctx;
 }
@@ -150,16 +148,16 @@ static void* setup_rdc() {
                <delim><len><timestamp>
 
        Field lengths (bytes) are:
-               delim           4      
+               delim           4     
                len                     8       (7 digits + 0)
                timestamp       16  (15 digits + 0)
 
-       
+
        Timestamp is a single unsigned long long in ASCII; ms since epoch.
        If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
-       the timestamp generated will be 1570113591103. 
+       the timestamp generated will be 1570113591103.
 
-       The lenght and timestamp fields in the header are zero terminated so 
+       The lenght and timestamp fields in the header are zero terminated so
        they can be parsed as a string (atoi etc).
 */
 static char* build_hdr( int len, char* dest, int dest_len ) {
@@ -305,7 +303,9 @@ static inline void chalk_ok( fifo_t* fifo ) {
 
 /*
        Callback function driven to traverse the symtab and generate the
-       counts for each fifo.
+       counts for each fifo.  Sonar will complain about unused parameters which
+       are normal for callbacks. Further, sonar will grumble about st, and entry
+       not being const; we can't unless RMR proto for the callback changes.
 */
 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
        fifo_t* fifo;
@@ -395,7 +395,7 @@ extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
 /*
        Create the context.
 */
-extern void* mcl_mk_context( char* dir ) {
+extern void* mcl_mk_context( const char* dir ) {
        mcl_ctx_t*      ctx;
 
        if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
@@ -424,7 +424,6 @@ static void read_header( int fd, char* buf ) {
        int len;
        int need = MCL_EXHDR_SIZE;              // total needed
        int dneed;                                              // delimieter needed
-       int     rlen;
        char*   rp;                             // read position in buf
 
        len = read( fd, buf, need );
@@ -433,7 +432,7 @@ static void read_header( int fd, char* buf ) {
        }
 
        while( TRUE ) {
-               if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
+               if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
                        rp = buf + len;
                        dneed = strlen( MCL_DELIM ) - len;
 
@@ -457,11 +456,9 @@ static void read_header( int fd, char* buf ) {
                        return;
                }
 
-               while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
+               while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
                        len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
                }
-
-               need = MCL_EXHDR_SIZE - len;
        }
 }
 
@@ -491,7 +488,6 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
        int need;
        char wbuf[4096];
        mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
-       fifo_t* fref = NULL;                            // the fifo struct we found
 
        if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
                errno = EINVAL;
@@ -510,7 +506,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                                *timestamp = 0;
                        }
 
-                       len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
+                       read( fd, wbuf, MCL_LEN_SIZE );                                 // we assume we will get all 8 bytes as there isn't a way to sync the old stream
                        msg_len = need = atoi( wbuf );
                }
 
@@ -526,7 +522,7 @@ static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hd
                }
 
                if( msg_len > got ) {                                   // we must ditch rest of this message
-                       need = msg_len = got;
+                       need = msg_len - got;
                        while( need > 0 ) {
                                len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
                                need -= len;
@@ -578,7 +574,7 @@ extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int l
        timestamp.
 
        The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
-       message.  When the health check message is received it is responded to 
+       message.  When the health check message is received it is responded to
        with the current state of processing (ok or err).
 */
 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
@@ -586,16 +582,15 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
        fifo_t*         fifo;                                   // fifo to chalk counts on
        rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
        char            header[128];                    // header we'll pop in front of the payload
-       int                     state;
        int                     fd;                                             // file des to write to
        long long       total = 0;                              // total messages received and written
        long long       total_drops = 0;                // total messages received and written
        long            count = 0;                              // messages received and written during last reporting period
        long            errors = 0;                             // unsuccessful payload writes
-       long            drops;                                  // number of drops
+       long            drops = 0;                              // number of drops
        time_t          next_report = 0;                // we'll report every 2 seconds if report is true
        time_t          now;
-       int                     hwlen;                                  // write len for header
+       size_t          hwlen;                                  // write len for header
        void*           rdc_ctx = NULL;                 // raw data capture context
        void*           rdc_buf = NULL;                 // capture buffer
 
@@ -619,23 +614,25 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
                                mbuf->mtype = RIC_HEALTH_CHECK_RESP;            // if we're here we are running and all is ok
                                mbuf->sub_id = -1;
                                mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE );  // ensure payload is large enough
-                               strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
-                               rmr_rts_msg( ctx->mrc, mbuf );
+                               if( mbuf->payload != NULL ) {
+                                       strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
+                                       rmr_rts_msg( ctx->mrc, mbuf );
+                               }
                                continue;
                        }
-                       
+
                        if( mbuf->len > 0  ) {
+                               if( long_hdr ) {
+                                       build_hdr( mbuf->len, header, sizeof( header ) );
+                                       hwlen = MCL_EXHDR_SIZE;
+                               } else {
+                                       snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+                                       hwlen = MCL_LEN_SIZE;
+                               }
+
                                fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
                                if( fd >= 0 ) {
-                                       if( long_hdr ) {
-                                               build_hdr( mbuf->len, header, sizeof( header ) );
-                                               hwlen = MCL_EXHDR_SIZE;
-                                       } else {
-                                               snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
-                                               hwlen = MCL_LEN_SIZE;
-                                       }
-
-                                       if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
+                                       if( write( fd, header, hwlen ) != hwlen ) {                     // write exactly MCL_LEN_SIZE bytes from the buffer
                                                drops++;
                                                total_drops++;
                                                chalk_error( fifo );
@@ -653,14 +650,14 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
 
                                if( rdc_ctx != NULL ) {
                                        rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
-                                       rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
+                                       rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                // write the raw data
                                }
                        }
                }
 
                if( report ) {
                        if( (now = time( NULL ) ) > next_report ) {
-                       rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
+                               rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
                                fflush( stdout );
 
                                logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
@@ -668,11 +665,17 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
                                next_report = now + report;
                                count = 0;
                                drops = 0;
+                               errors = 0;
 
                                fflush( stdout );
                        }
                }
-       } while( FOREVER );                             // forever allows for escape during unit testing
+
+               if( ! FOREVER ) {                       // allow escape during unit tests; compiled out othewise, but sonar won't see that
+                       free( rdc_buf );
+                       break;                                  // sonar grumbles if we put FOREVER into the while; maddening
+               }
+       } while( 1 );
 }
 
 
@@ -680,13 +683,13 @@ extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
        Given a buffer and length, along with the message type, look up the fifo and write
        the buffer. Returns 0 on error; 1 on success.
 */
-extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
+extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
        mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
        fifo_t*         fifo;                                   // fifo to chalk counts on
-       int                     state = -1;
+       size_t          state = -1;
        int                     fd;                                             // file des to write to
 
-       if( plen <= 0 ) {
+       if( plen <= 0  || payload == NULL ) {
                return 1;
        }
 
@@ -698,7 +701,7 @@ extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
        fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
        if( fd >= 0 ) {
                state = write( fd, payload, plen );
-       } 
+       }
 
-       return state == plen;
+       return state == (size_t) plen;
 }