#include <rmr/rmr.h>
#include <rmr/rmr_symtab.h>
+#include <rmr/RIC_message_types.h>
#include "mcl.h"
dest is nil. The header is of the form:
<delim><len><timestamp>
+ Field lengths (bytes) are:
+ delim 4
+ len 8 (7 digits + 0)
+ timestamp 16 (15 digits + 0)
+
+
Timestamp is a single unsigned long long in ASCII; ms since epoch.
If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
- the timestamp generated will be 1570113591103.
+ the timestamp generated will be 1570113591103.
+
+ The lenght and timestamp fields in the header are zero terminated so
+ they can be parsed as a string (atoi etc).
*/
static char* build_hdr( int len, char* dest, int dest_len ) {
struct timespec ts; // time just before call executed
if( dest == NULL ) {
- dest_len = 48;
+ dest_len = MCL_EXHDR_SIZE + 2; // more than enough room
dest = (char *) malloc( sizeof( char ) * dest_len );
} else {
- if( dest_len < 28 ) { // shouldn't happen, but take no chances
+ if( dest_len < MCL_EXHDR_SIZE ) { // shouldn't happen, but take no chances
memset( dest, 0, dest_len );
return NULL;
}
memset( dest, 0, dest_len );
clock_gettime( CLOCK_REALTIME, &ts );
- sprintf( dest, "%s%07d", MCL_DELIM, len );
- sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
+ snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
+ snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
return dest;
}
read_header( fd, wbuf );
msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
if( timestamp ) {
- strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
+ strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
}
} else {
if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
If long_hdr is true, then we geneate an extended header with a delimiter and
timestamp.
+
+ The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
+ message. When the health check message is received it is responded to
+ with the current state of processing (ok or err).
*/
-extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
+extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
fifo_t* fifo; // fifo to chalk counts on
rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
do {
mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
- if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0 ) {
- fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
- if( fd >= 0 ) {
- if( long_hdr ) {
- build_hdr( mbuf->len, header, sizeof( header ) );
- hwlen = MCL_EXHDR_SIZE;
- } else {
- snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
- hwlen = MCL_LEN_SIZE;
- }
+ if( mbuf != NULL && mbuf->state == RMR_OK ) {
+ if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
+ mbuf->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
+ mbuf->sub_id = -1;
+ mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE ); // ensure payload is large enough
+ strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
+ rmr_rts_msg( ctx->mrc, mbuf );
+ continue;
+ }
+
+ if( mbuf->len > 0 ) {
+ fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
+ if( fd >= 0 ) {
+ if( long_hdr ) {
+ build_hdr( mbuf->len, header, sizeof( header ) );
+ hwlen = MCL_EXHDR_SIZE;
+ } else {
+ snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+ hwlen = MCL_LEN_SIZE;
+ }
- if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
- drops++;
- total_drops++;
- chalk_error( fifo );
- } else {
- if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
- errors++;
+ if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
+ drops++;
+ total_drops++;
chalk_error( fifo );
} else {
- chalk_ok( fifo );
- count++;
- total++;
+ if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
+ errors++;
+ chalk_error( fifo );
+ } else {
+ chalk_ok( fifo );
+ count++;
+ total++;
+ }
}
}
- }
- if( rdc_ctx != NULL ) {
- rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
- rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
+ if( rdc_ctx != NULL ) {
+ rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
+ rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
+ }
}
}