char* suffix = ".rdc";
char* done = NULL;
- if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
- if( ep != NULL && atoi( ep ) == 0 ) {
- logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
- return NULL;
- }
+ if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0
+ logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
+ return NULL;
}
if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
value = atoi( ep );
logit( LOG_INFO, "setting frequency: %d", value );
- rdc_set_freq( ctx, value );
+ rdc_set_freq( ctx, value );
}
return ctx;
}
<delim><len><timestamp>
Field lengths (bytes) are:
- delim 4
+ delim 4
len 8 (7 digits + 0)
timestamp 16 (15 digits + 0)
-
+
Timestamp is a single unsigned long long in ASCII; ms since epoch.
If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
- the timestamp generated will be 1570113591103.
+ the timestamp generated will be 1570113591103.
- The lenght and timestamp fields in the header are zero terminated so
+ The lenght and timestamp fields in the header are zero terminated so
they can be parsed as a string (atoi etc).
*/
static char* build_hdr( int len, char* dest, int dest_len ) {
/*
Callback function driven to traverse the symtab and generate the
- counts for each fifo.
+ counts for each fifo. Sonar will complain about unused parameters which
+ are normal for callbacks. Further, sonar will grumble about st, and entry
+ not being const; we can't unless RMR proto for the callback changes.
*/
static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
fifo_t* fifo;
/*
Create the context.
*/
-extern void* mcl_mk_context( char* dir ) {
+extern void* mcl_mk_context( const char* dir ) {
mcl_ctx_t* ctx;
if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
int len;
int need = MCL_EXHDR_SIZE; // total needed
int dneed; // delimieter needed
- int rlen;
char* rp; // read position in buf
len = read( fd, buf, need );
}
while( TRUE ) {
- if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
+ if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
rp = buf + len;
dneed = strlen( MCL_DELIM ) - len;
return;
}
- while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
+ while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
}
-
- need = MCL_EXHDR_SIZE - len;
}
}
int need;
char wbuf[4096];
mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
- fifo_t* fref = NULL; // the fifo struct we found
if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
errno = EINVAL;
*timestamp = 0;
}
- len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
+ read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 bytes as there isn't a way to sync the old stream
msg_len = need = atoi( wbuf );
}
}
if( msg_len > got ) { // we must ditch rest of this message
- need = msg_len = got;
+ need = msg_len - got;
while( need > 0 ) {
len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
need -= len;
timestamp.
The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
- message. When the health check message is received it is responded to
+ message. When the health check message is received it is responded to
with the current state of processing (ok or err).
*/
extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
fifo_t* fifo; // fifo to chalk counts on
rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
char header[128]; // header we'll pop in front of the payload
- int state;
int fd; // file des to write to
long long total = 0; // total messages received and written
long long total_drops = 0; // total messages received and written
long count = 0; // messages received and written during last reporting period
long errors = 0; // unsuccessful payload writes
- long drops; // number of drops
+ long drops = 0; // number of drops
time_t next_report = 0; // we'll report every 2 seconds if report is true
time_t now;
- int hwlen; // write len for header
+ size_t hwlen; // write len for header
void* rdc_ctx = NULL; // raw data capture context
void* rdc_buf = NULL; // capture buffer
mbuf->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
mbuf->sub_id = -1;
mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE ); // ensure payload is large enough
- strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
- rmr_rts_msg( ctx->mrc, mbuf );
+ if( mbuf->payload != NULL ) {
+ strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
+ rmr_rts_msg( ctx->mrc, mbuf );
+ }
continue;
}
-
+
if( mbuf->len > 0 ) {
+ if( long_hdr ) {
+ build_hdr( mbuf->len, header, sizeof( header ) );
+ hwlen = MCL_EXHDR_SIZE;
+ } else {
+ snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+ hwlen = MCL_LEN_SIZE;
+ }
+
fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
if( fd >= 0 ) {
- if( long_hdr ) {
- build_hdr( mbuf->len, header, sizeof( header ) );
- hwlen = MCL_EXHDR_SIZE;
- } else {
- snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
- hwlen = MCL_LEN_SIZE;
- }
-
- if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
+ if( write( fd, header, hwlen ) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
drops++;
total_drops++;
chalk_error( fifo );
if( rdc_ctx != NULL ) {
rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
- rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
+ rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
}
}
}
if( report ) {
if( (now = time( NULL ) ) > next_report ) {
- rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
+ rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
fflush( stdout );
logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
next_report = now + report;
count = 0;
drops = 0;
+ errors = 0;
fflush( stdout );
}
}
- } while( FOREVER ); // forever allows for escape during unit testing
+
+ if( ! FOREVER ) { // allow escape during unit tests; compiled out othewise, but sonar won't see that
+ free( rdc_buf );
+ break; // sonar grumbles if we put FOREVER into the while; maddening
+ }
+ } while( 1 );
}
Given a buffer and length, along with the message type, look up the fifo and write
the buffer. Returns 0 on error; 1 on success.
*/
-extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
+extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
fifo_t* fifo; // fifo to chalk counts on
- int state = -1;
+ size_t state = -1;
int fd; // file des to write to
- if( plen <= 0 ) {
+ if( plen <= 0 || payload == NULL ) {
return 1;
}
fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd
if( fd >= 0 ) {
state = write( fd, payload, plen );
- }
+ }
- return state == plen;
+ return state == (size_t) plen;
}