This change attempts to address the bugs and smells in the listener.
A fair few of the "bugs" were unit test coverage related (passing a
nil pointer) and to avoid those the test code was moved into a new
test directory, but it might need to be overtly excluded inthe
CI setup. The other bugs were addressed.
Issue-ID: RIC-632
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ifbc9234e4408db3cb7111dc4a4ad9e9e268341c9
add_test(
NAME drive_listener_tests
COMMAND bash run_unit_test.ksh CMBUILD=${CMAKE_CURRENT_BINARY_DIR}
- WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/../sidecars/listener
+ WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/../sidecars/listener_test
)
- #WORKING_DIRECTORY ../sidecars/listener
# Mnemonic: Dockerfile
-# Abstract: Build file which creaes a runtime image for the listener.
+# Abstract: Build file which creaes a runtime image for the listener.
# Building should be as simple as:
#
# docker build -f sidecars/listener/Dockerfile -t mc_listener:xx.xx .
RUN dpkg -i rmr_${RMR_VER}_amd64.deb
RUN dpkg -i rmr-dev_${RMR_VER}_amd64.deb
-RUN mkdir /playpen/bin /playpen/src
+RUN mkdir /playpen/bin /playpen/listener /playpen/listener/test
ARG SRC=.
-COPY ${SRC}/*.ksh ${SRC}/Makefile ${SRC}/*.h ${SRC}/*.c /playpen/src/
-COPY ${SRC}/verify_replay.sh ${SRC}/verify.sh /playpen/src/
+COPY ${SRC}/*.ksh ${SRC}/Makefile ${SRC}/*.h ${SRC}/*.c /playpen/listener/
+COPY ${SRC}/verify_replay.sh ${SRC}/verify.sh /playpen/listener/
+
+ARG TEST=${SRC}/test
+COPY ${TEST}/*.ksh ${TEST}/Makefile ${TEST}/*.h ${TEST}/*.c /playpen/listener/test/
# Build all binaries; verify scripts expect them to be in bin, so we must copy too
#
ENV LD_LIBRARY_PATH=/usr/local/lib64:/usr/local/lib
ENV C_INCLUDE_PATH=/usr/local/include
-RUN cd /playpen/src/; make -B all; ls -al mc_listener; cp mc_listener sender pipe_reader rdc_replay rdc_extract /playpen/bin/
+RUN cd /playpen/listener \
+ && make -B all \
+ && ls -al mc_listener \
+ && cp mc_listener sender pipe_reader rdc_replay rdc_extract /playpen/bin/
# Run unit tests. If they don't pass the build fails here. Tests can be run from src, but expect binaries in bin
# so that they can be run in the final image as well.
#
-ENV PATH /playpen/bin:/playpen/src:$PATH
-RUN cd /playpen/src/; ./run_unit_test.ksh
-RUN /playpen/src/verify.sh; /playpen/src/verify_replay.sh
+ENV PATH /playpen/bin:/playpen/listener:$PATH
+RUN cd /playpen/listener/test/; ./run_unit_test.ksh
+RUN cd /playpen/listener/; /playpen/listener/verify.sh; /playpen/listener/verify_replay.sh
# ----- final, smaller image ----------------------------------
# bash doesn't cut it for run_replay so grab a real shell and clean up as much as we can
RUN apt-get update; apt-get install -y ksh
-RUN rm -fr /var/lib/apt/lists
+RUN rm -fr /var/lib/apt/lists
# must have rmr runtime to get health check etc
COPY --from=buildenv /playpen/rmr_*.deb /tmp/
RUN mkdir -p /playpen/bin
COPY --from=buildenv /usr/local/lib/* /usr/local/lib/
-COPY --from=buildenv /playpen/src/mc_listener /playpen/src/sender /playpen/src/pipe_reader /playpen/src/rdc_replay /playpen/src/rdc_extract /playpen/bin/
+COPY --from=buildenv /playpen/listener/mc_listener /playpen/listener/sender /playpen/listener/pipe_reader /playpen/listener/rdc_replay /playpen/listener/rdc_extract /playpen/bin/
COPY ${SRC}/verify_replay.sh ${SRC}/verify.sh ${SRC}/run_replay.sh ${SRC}/help /playpen/bin/
ENV PATH=/playpen/bin:$PATH
# with the latest builder support for NNG was dropped, so we must build only SI95
# based applications; the _si suffix has been dropped
-binaries = mc_listener
+binaries = mc_listener
adjuncts = rdc_replay rdc_extract
testers = sender pipe_reader sender
-test_progs = sender unit_test pipe_reader
+test_progs = sender pipe_reader
lib_obj = mcl.o rdc.o
lib_h = mcl.h
mc_listener: mc_listener.c libmcl.a
gcc mc_listener.c -o mc_listener -L. -lmcl -lrmr_si -lm -lpthread
-# ---- testing stuff -----------------------------------------------------------------
-tests: $(test_progs)
+# ---- adjunct tools -----------------------------------------------------------------
+rdc_replay: rdc_replay.c libmcl.a
+ gcc rdc_replay.c -o rdc_replay -L. -lmcl -lrmr_si -lpthread -lm
+rdc_extract: rdc_extract.c libmcl.a
+ gcc rdc_extract.c -o rdc_extract -L. -lmcl -lrmr_si -lpthread -lm
+
+# ------- container verification programmes -------------------------------------------
sender : sender.c
gcc sender.c -o sender -lrmr_si -lm -lpthread
pipe_reader : pipe_reader.c libmcl.a
gcc pipe_reader.c -o pipe_reader -L. -lmcl -lrmr_si -lm -lpthread
-unit_test: unit_test.c mcl.c
- gcc -g $(coverage_opts) unit_test.c -o unit_test -lrmr_si -lm -lpthread
-
-# ---- adjunct tools -----------------------------------------------------------------
-rdc_replay: rdc_replay.c libmcl.a
- gcc rdc_replay.c -o rdc_replay -L. -lmcl -lrmr_si -lpthread -lm
-
-rdc_extract: rdc_extract.c libmcl.a
- gcc rdc_extract.c -o rdc_extract -L. -lmcl -lrmr_si -lpthread -lm
-
# ---- housekeeping stuff -------------------------------------------------------------
# remove only intermediates
clean:
char* suffix = ".rdc";
char* done = NULL;
- if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
- if( ep != NULL && atoi( ep ) == 0 ) {
- logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
- return NULL;
- }
+ if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) { // exists and is 0
+ logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
+ return NULL;
}
if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
value = atoi( ep );
logit( LOG_INFO, "setting frequency: %d", value );
- rdc_set_freq( ctx, value );
+ rdc_set_freq( ctx, value );
}
return ctx;
}
<delim><len><timestamp>
Field lengths (bytes) are:
- delim 4
+ delim 4
len 8 (7 digits + 0)
timestamp 16 (15 digits + 0)
-
+
Timestamp is a single unsigned long long in ASCII; ms since epoch.
If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
- the timestamp generated will be 1570113591103.
+ the timestamp generated will be 1570113591103.
- The lenght and timestamp fields in the header are zero terminated so
+ The lenght and timestamp fields in the header are zero terminated so
they can be parsed as a string (atoi etc).
*/
static char* build_hdr( int len, char* dest, int dest_len ) {
/*
Callback function driven to traverse the symtab and generate the
- counts for each fifo.
+ counts for each fifo. Sonar will complain about unused parameters which
+ are normal for callbacks. Further, sonar will grumble about st, and entry
+ not being const; we can't unless RMR proto for the callback changes.
*/
static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
fifo_t* fifo;
/*
Create the context.
*/
-extern void* mcl_mk_context( char* dir ) {
+extern void* mcl_mk_context( const char* dir ) {
mcl_ctx_t* ctx;
if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
int len;
int need = MCL_EXHDR_SIZE; // total needed
int dneed; // delimieter needed
- int rlen;
char* rp; // read position in buf
len = read( fd, buf, need );
}
while( TRUE ) {
- if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
+ if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
rp = buf + len;
dneed = strlen( MCL_DELIM ) - len;
return;
}
- while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
+ while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
}
-
- need = MCL_EXHDR_SIZE - len;
}
}
int need;
char wbuf[4096];
mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
- fifo_t* fref = NULL; // the fifo struct we found
if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
errno = EINVAL;
*timestamp = 0;
}
- len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
+ read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 bytes as there isn't a way to sync the old stream
msg_len = need = atoi( wbuf );
}
}
if( msg_len > got ) { // we must ditch rest of this message
- need = msg_len = got;
+ need = msg_len - got;
while( need > 0 ) {
len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
need -= len;
timestamp.
The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
- message. When the health check message is received it is responded to
+ message. When the health check message is received it is responded to
with the current state of processing (ok or err).
*/
extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
fifo_t* fifo; // fifo to chalk counts on
rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
char header[128]; // header we'll pop in front of the payload
- int state;
int fd; // file des to write to
long long total = 0; // total messages received and written
long long total_drops = 0; // total messages received and written
long count = 0; // messages received and written during last reporting period
long errors = 0; // unsuccessful payload writes
- long drops; // number of drops
+ long drops = 0; // number of drops
time_t next_report = 0; // we'll report every 2 seconds if report is true
time_t now;
- int hwlen; // write len for header
+ size_t hwlen; // write len for header
void* rdc_ctx = NULL; // raw data capture context
void* rdc_buf = NULL; // capture buffer
mbuf->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
mbuf->sub_id = -1;
mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE ); // ensure payload is large enough
- strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
- rmr_rts_msg( ctx->mrc, mbuf );
+ if( mbuf->payload != NULL ) {
+ strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
+ rmr_rts_msg( ctx->mrc, mbuf );
+ }
continue;
}
-
+
if( mbuf->len > 0 ) {
+ if( long_hdr ) {
+ build_hdr( mbuf->len, header, sizeof( header ) );
+ hwlen = MCL_EXHDR_SIZE;
+ } else {
+ snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
+ hwlen = MCL_LEN_SIZE;
+ }
+
fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
if( fd >= 0 ) {
- if( long_hdr ) {
- build_hdr( mbuf->len, header, sizeof( header ) );
- hwlen = MCL_EXHDR_SIZE;
- } else {
- snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
- hwlen = MCL_LEN_SIZE;
- }
-
- if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
+ if( write( fd, header, hwlen ) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
drops++;
total_drops++;
chalk_error( fifo );
if( rdc_ctx != NULL ) {
rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
- rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
+ rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
}
}
}
if( report ) {
if( (now = time( NULL ) ) > next_report ) {
- rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
+ rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
fflush( stdout );
logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
next_report = now + report;
count = 0;
drops = 0;
+ errors = 0;
fflush( stdout );
}
}
- } while( FOREVER ); // forever allows for escape during unit testing
+
+ if( ! FOREVER ) { // allow escape during unit tests; compiled out othewise, but sonar won't see that
+ free( rdc_buf );
+ break; // sonar grumbles if we put FOREVER into the while; maddening
+ }
+ } while( 1 );
}
Given a buffer and length, along with the message type, look up the fifo and write
the buffer. Returns 0 on error; 1 on success.
*/
-extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
+extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
fifo_t* fifo; // fifo to chalk counts on
- int state = -1;
+ size_t state = -1;
int fd; // file des to write to
- if( plen <= 0 ) {
+ if( plen <= 0 || payload == NULL ) {
return 1;
}
fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd
if( fd >= 0 ) {
state = write( fd, payload, plen );
- }
+ }
- return state == plen;
+ return state == (size_t) plen;
}
//------------ prototypes --------------------------------------------------------------
extern void mcl_fifo_fanout( void* ctx, int report, int long_hdrs );
-extern int mcl_fifo_one( void* ctx, char* payload, int plen, int mtype );
+extern int mcl_fifo_one( void* ctx, const char* payload, int plen, int mtype );
extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout );
-extern void* mcl_mk_context( char* dir );
+extern void* mcl_mk_context( const char* dir );
extern int mcl_fifo_read1( void* vctx, int mtype, char* ubuf, int ublen, int long_hdr );
extern int mcl_fifo_tsread1( void* vctx, int mtype, char* ubuf, int ublen, int long_hdr, char* timestamp );
extern int mcl_set_sigh( );
Where <mtype> is the message type of the message received and
<len> is the length of the data that was written to the FIFO.
-
+
Date: 06 Oct 2019
Author: E. Scott Daniels
*/
#include <stdarg.h>
-#include <errno.h>
+#include <errno.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
/*
A capture buffer. The listener writes FIFO output in two stages, thus
- we provide the ability to initialise a capture with the msg type and
+ we provide the ability to initialise a capture with the msg type and
the MRL header, then to write the payload using this saved data. The
idea is to not require the caller to save the header.
*/
Copy and unlink old file is successful. During writing the file mode will
be write only for the owner (0200). If the mode passed in is not 0, then
just prior to renaming the file to 'new', the mode will be changed. If
- mode is 0, then we assume the caller will change the file mode when
+ mode is 0, then we assume the caller will change the file mode when
appropriate.
- There seems to be an issue with some collectors and thus it is required
+ There seems to be an issue with some collectors and thus it is required
to initially name the file with a leading dot (.) until the file is closed
and ready to be read by external processes (marking it write-only seems
not to discourage them from trying!).
int state;
int remain; // number of bytes remaining to write
-
+
errno = 0;
if( (rfd = open( old, O_RDONLY )) < 0 ) {
logit( LOG_ERR, "copy: open src for copy failed: %s: %s", old, strerror( errno ) );
} else {
snprintf( tfname, len, ".%s", wbuf ); // no path, just add leading .
}
+ free( wbuf );
//logit( LOG_INFO, "copy: creating file in tmp filename: %s", tfname );
if( (wfd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0200 )) < 0 ) {
}
}
+ free( tfname );
return state < 0 ? -1 : 0;
}
Attempt to rename (move) the old file to the new file. If that fails with
the error EXDEV (not on same device), then we will copy the file. All
other errors returned by the rename() command are considered fatal and
- copy is not attempted. Returns 0 on success, -1 with errno set on
+ copy is not attempted. Returns 0 on success, -1 with errno set on
failure. See man page for rename for possible errno values and meanings.
*/
static int mvocp( char* old, char* new ) {
return copy_unlink( old, new, 0644 );
}
-
+
return 0;
}
rdc_ctx_t* ctx;
ctx = (rdc_ctx_t *) vctx;
-
-
+
+
if( ctx == NULL ) {
return -1;
}
/*
- Initialise the raw data collection facility. The two directories, staging and final, may be the
+ Initialise the raw data collection facility. The two directories, staging and final, may be the
same, and if different they _should_ reside on the same filesystem such that a move
(rename) operation is only the change in the directory and does not involve the copy
of bytes.
} else {
ctx->fdir = strdup( fdir );
}
-
+
}
if( suffix ) {
/*
Allow the file rotation frequency to be set to something other
- than the default. A minimum of 10s is enforced, but there is
+ than the default. A minimum of 10s is enforced, but there is
no maximum.
*/
extern void rdc_set_freq( void* vctx, int freq ) {
rdc_ctx_t* ctx;
ctx = (rdc_ctx_t *) vctx;
-
+
if( ctx != NULL && freq >= 10 ) {
ctx->frequency = freq;
logit( LOG_INFO, "(rdc) file roll frequency set to %d seconds", ctx->frequency );
if( ctx->fd < 0 ) {
rdc_open( ctx ); // need a file, get it open
- }
+ }
snprintf( header, sizeof( header ), "@RDC%07d*%07d", cb->mtype, cb->uhlen + len );
write( ctx->fd, header, 20 );
We save the message type, and will use that and the user header length and payload
length on write to create the complete RDC header.
*/
-extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
+extern void* rdc_init_buf( int mtype, char* uheader, int uhlen, void* vcb ) {
cap_buf_t* cb;
cb = (cap_buf_t *) vcb;
errno = ENOMEM;
return NULL;
}
- }
+ }
cb->mtype = mtype;
if( uhlen > sizeof( cb->uheader ) ) {
#ifdef SELF_TEST
/*
- Run some quick checks which require the directories in /tmp to exist, and some
+ Run some quick checks which require the directories in /tmp to exist, and some
manual verification on the part of the tester.
*/
int main( ) {
void* cb = NULL; // capture buffere
char* uheader;
char* payload;
- int i;
+ int i;
ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", NULL ); // does not create done files
//ctx = rdc_init( "/tmp/rdc/stage", "/tmp/rdc/final", ".rdc", ".done" ); // will create a "done" file
--- /dev/null
+# vim: ts=4 sw=4 noet:
+#----------------------------------------------------------------------------------
+#
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#---------------------------------------------------------------------------------
+
+# this make file assuems that both NNG and RMR are installed and that the variables
+# LD_LIBRARY_PATH, LIBRARY_PATH are set correctly.
+
+testers = sender pipe_reader sender
+
+test_progs = sender unit_test pipe_reader
+lib_obj = mcl.o rdc.o
+lib_h = mcl.h
+
+coverage_opts = -ftest-coverage -fprofile-arcs
+
+# make with no parms should build tests
+all: tests
+
+
+# ---- testing stuff -----------------------------------------------------------------
+tests: $(test_progs)
+
+
+unit_test: unit_test.c ../mcl.c ../rdc.c
+ gcc -g $(coverage_opts) unit_test.c -o unit_test -lrmr_si -lm -lpthread
+
+# ---- housekeeping stuff -------------------------------------------------------------
+# remove only intermediates
+clean:
+ rm -f *.o *.gcda *.gcno *.gcov
+
+# remove anything that can be rebuilt
+nuke: clean
+ rm -f *mcl.a $(test_progs)
+
# woudl do (but seems not to).
#
function ensure_pkgs {
+ if (( no_rmr_load ))
+ then
+ return
+ fi
+
if (( force_rmr_load )) || [[ -d /usr/local/include/rmr ]]
then
echo "[INFO] found RMR installed in /usr/local"
do
case $1 in
-f) force_rmr_load=1;;
+ -N) no_rmr_load=1;; # for local testing
*) echo "unrecognised option: $1"
exit 1
fi
-show_coverage.ksh unit_test.c # compute coverage and generate .gcov files
+./show_coverage.ksh unit_test.c # compute coverage and generate .gcov files
echo "Coverage with discounting (raw values in parens)"
./discount_chk.ksh $(ls *gcov|egrep -v "^test_|unit_test.c")
rm -f *test*.gcov
fi
+cp *.gcov ../ # the CI job defines the listener dir as the spot to find these, so put them there
+
rm -f /tmp/PID$$.*
exit $rc
Mnemonic: unit_test.c
Abstract: Basic unit tests for the mc listener.
Date: 22 August 2019
- Author: E. Scott Daniels
+ Author: E. Scott Daniels
*/
#define FOREVER 0 // allows forever loops in mcl code to escape after one loop
#include "test_rmr_em.c" // emulated rmr functions (for receives)
// this/these are what we are testing; include them directly (must be after forever def)
-#include "mcl.c"
-#include "rdc.c"
+#include "../mcl.c"
+#include "../rdc.c"
/*
Set up env things for the rdc setup call.
char* bp;
void* buf;
int state;
- char timestamp[1024]; // read will put a timestamp here
+ char timestamp[1024]; // read will put a timestamp here
if( argc > 1 ) {
dname = argv[1];
}
-
+
+ setenv( "MCL_RDC_ENABLE", "0", 1 ); /// test disabled mode for coverage
+ setup_rdc( );
+
set_env(); // set env that setup_rdc() looks for
ctx = mcl_mk_context( dname ); // allocate the context
// under test, the FOREVER = 0 keeps fanout from blocking; drive several times to cover all cases
mcl_fifo_fanout( ctx, 5, 1 ); // first rmr receive call will simulate a timeout
mcl_fifo_fanout( ctx, 5, 1 ); // second receive simulates a health check
- mcl_fifo_fanout( ctx, 5, 1 ); // 3-n return alternating timeout messages; drive so that
+ mcl_fifo_fanout( ctx, 5, 1 ); // 3-n return alternating timeout messages; drive so that
mcl_fifo_fanout( ctx, 5, 1 ); // we will have several land in the FIFO
mcl_fifo_fanout( ctx, 5, 1 );
mcl_fifo_fanout( ctx, 5, 1 );