Tweak unit tests to save coverage in common dir
[ric-app/mc.git] / sidecars / listener / src / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44 #include <rmr/RIC_message_types.h>
45
46 #include "mcl.h"
47
48 #ifndef FOREVER
49 #define FOREVER 1
50 #endif
51
52 #define READER 0
53 #define WRITER 1
54
55 #define TRUE    1
56 #define FALSE   0
57
58 /*
59         Information about one file descriptor. This is pointed to by the hash
60         such that the message type can be used as a key to look up the fifo's
61         file descriptor.
62 */
63 typedef struct {
64         int     fd;                                     // open fdes
65         int key;                                // symtab key
66         long long wcount;               // number of writes
67         long long drops;                // number dropped
68
69         long long wcount_rp;    // number of writes during last reporting period
70         long long drops_rp;             // number dropped during last reporting period
71 } fifo_t;
72
73 /*
74         Our conext.  Pointers to the read and write hash tables (both keyed on the message
75         type), the message router (RMR) context, and other goodies.
76 */
77 typedef struct {
78         void*   mrc;                            // the message router's context
79         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
80         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
81         char*   fifo_dir;                       // directory where we open fifos
82
83 } mcl_ctx_t;
84
85 // -------- private -------------------------------------------------------
86
87
88 /*
89         Set up for raw data capture. We look for directory overriedes from
90         environment variables, and then invoke the rdc_init() to actually
91         set things up.
92 */
93 static void* setup_rdc() {
94         void*   ctx;
95         int             value;                                                  // value computed for something
96         char*   ep;                                                             // pointer to environment var
97         char*   sdir = "/tmp/rdc/stage";                // default directory names
98         char*   fdir = "/tmp/rdc/final";
99         char*   suffix = ".rdc";
100         char*   done = NULL;
101
102         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL && atoi( ep ) == 0 ) {                                    // exists and is 0
103                 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting (MCL_RDCENABLE=0)\n" );
104                 return NULL;
105         }
106
107         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
108                 sdir = ep;
109         } else {
110                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
111                 mkdir( sdir, 0755 );
112         }
113
114         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
115                 fdir = ep;
116         } else {
117                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
118                 mkdir( fdir, 0755 );
119         }
120
121         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
122                 suffix = ep;
123         }
124
125         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
126                 done = ep;
127         }
128
129         ctx = rdc_init( sdir, fdir, suffix, done );
130         if( ctx == NULL ) {
131                 logit( LOG_ERR, "rdc_init did not generate a context" );
132         } else {
133                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
134                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
135         }
136
137         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
138                 value = atoi( ep );
139                 logit( LOG_INFO, "setting frequency: %d", value );
140                 rdc_set_freq( ctx, value );
141         }
142         return ctx;
143 }
144
145 /*
146         Builds an extended header in the buffer provided, or allocates a new buffer if
147         dest is nil. The header is of the form:
148                 <delim><len><timestamp>
149
150         Field lengths (bytes) are:
151                 delim           4     
152                 len                     8       (7 digits + 0)
153                 timestamp       16  (15 digits + 0)
154
155
156         Timestamp is a single unsigned long long in ASCII; ms since epoch.
157         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
158         the timestamp generated will be 1570113591103.
159
160         The lenght and timestamp fields in the header are zero terminated so
161         they can be parsed as a string (atoi etc).
162 */
163 static char* build_hdr( int len, char* dest, int dest_len ) {
164         struct timespec ts;         // time just before call executed
165
166         if( dest == NULL ) {
167                 dest_len = MCL_EXHDR_SIZE + 2;                  // more than enough room
168                 dest = (char *) malloc( sizeof( char ) * dest_len );
169         } else {
170                 if( dest_len < MCL_EXHDR_SIZE ) {               // shouldn't happen, but take no chances
171                         memset( dest, 0, dest_len );
172                         return NULL;
173                 }
174         }
175
176         memset( dest, 0, dest_len );
177
178         clock_gettime( CLOCK_REALTIME, &ts );
179         snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
180         snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
181
182         return dest;
183 }
184
185 /*
186         Build a file name and open. The io_direction is either READER or
187         WRITER.  For a writer we must 'trick' the system into allowing us
188         to open a pipe for writing in non-blocking mode so that we can
189         report on drops (messages we couldn't write because there was no
190         reader).  The trick is to open a reader on the pipe so that when
191         we open the writer there's a reader and the open won't fail. As
192         soon as we have the writer open, we can close the junk reader.
193
194         If the desired fifo does not exist, it is created.
195 */
196 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
197         char    wbuf[1024];
198         int             fd;                                     // real file des
199         int             jfd = -1;                       // junk file des
200         int             state;
201
202         if( ctx == NULL || mtype < 0 ) {
203                 return -1;
204         }
205
206         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
207
208         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
209         if( state != 0 && errno != EEXIST ) {
210                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
211                 return -1;
212         }
213
214         if( io_dir == READER ) {
215                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
216                 if( fd < 0 ) {
217                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
218                 }
219         } else {
220                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
221                 if( jfd < 0 ) {
222                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
223                         return -1;
224                 }
225
226                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
227                 if( fd < 0 ) {
228                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
229                 }
230
231                 close( jfd );                   // should be safe to close this
232         }
233
234
235         return fd;
236 }
237
238 /*
239         Given a message type, return the file des of the fifo that
240         the payload should be written to.        Returns the file des, or -1
241         on error. When sussing out a read file descriptor this will
242         block until there is a fifo for the message type which is
243         open for reading.
244
245         If fref is not nil, then a pointer to the fifo info block is returned
246         allowing for direct update of counts after the write.
247 */
248 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
249         fifo_t* fifo = NULL;
250         void*   hash;
251
252         if( ctx == NULL ) {
253                 if( fref != NULL ) {
254                         *fref = NULL;
255                 }
256                 return -1;
257         }
258
259         if( io_dir == READER ) {                // with an integer key, we need two hash tables
260                 hash = ctx->rd_hash;
261         } else {
262                 hash = ctx->wr_hash;
263         }
264
265         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
266                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
267                 if( fifo != NULL ) {
268                         memset( fifo, 0, sizeof( *fifo ) );
269                         fifo->key = mtype;
270                         fifo->fd = open_fifo( ctx, mtype, io_dir );
271                         if( fifo->fd >= 0 ) {                                   // save only on good open
272                                 rmr_sym_map( hash, mtype, fifo );
273                         } else {
274                                 free( fifo );
275                                 fifo = NULL;
276                         }
277                 }
278         }
279
280         if( fref != NULL ) {
281                 *fref = fifo;
282         }
283
284         return fifo == NULL ? -1 : fifo->fd;
285 }
286
287 /*
288         Make marking counts easier in code
289 */
290 static inline void chalk_error( fifo_t* fifo ) {
291         if( fifo != NULL ) {
292                 fifo->drops++;
293                 fifo->drops_rp++;
294         }
295 }
296
297 static inline void chalk_ok( fifo_t* fifo ) {
298         if( fifo != NULL ) {
299                 fifo->wcount++;
300                 fifo->wcount_rp++;
301         }
302 }
303
304 /*
305         Callback function driven to traverse the symtab and generate the
306         counts for each fifo.  Sonar will complain about unused parameters which
307         are normal for callbacks. Further, sonar will grumble about st, and entry
308         not being const; we can't unless RMR proto for the callback changes.
309 */
310 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
311         fifo_t* fifo;
312         int             report_period = 60;
313
314         if( data ) {
315                 report_period = *((int *) data);
316         }
317
318         if( (fifo = (fifo_t *) thing) != NULL ) {
319                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
320                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
321
322                 fifo->wcount_rp = 0;            // reset the report counts
323                 fifo->drops_rp = 0;
324                 return;                                         // return here to avoid sonar required hack below
325         }
326
327         /*
328                 Sonar doesn't grok the fact that for callback functions some parms are naturally
329                 ignored. So, to eliminate the 5 code smells because we only care about thing, we
330                 have this hack....
331         */
332         if( st == NULL && entry == NULL && name == NULL && data == NULL ) {
333                 fprintf( stderr, "mdcl: all parms to callback stats were nil\n" );
334         }
335 }
336
337 // ---------- public ------------------------------------------------------
338 /*
339         Sets a signal handler for sigpipe so we don't crash if a reader closes the
340         last reading fd on a pipe. We could do this automatically, but if the user
341         programme needs to trap sigpipe too, this gives them the option not to have
342         us interfere.
343 */
344 extern int mcl_set_sigh( ) {
345         signal( SIGPIPE, SIG_IGN );
346 }
347
348 /*
349         "Opens" the interface to RMR such that messages sent to the application will
350         be available via the rmr receive funcitons. This is NOT automatically called
351         by the mk_context function as some applications will be using the mc library
352         for non-RMR, fifo, chores.
353 */
354 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
355         mcl_ctx_t*      ctx;
356         int             announce = 0;
357
358         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
359                 return 0;
360         }
361
362         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
363         if( ctx->mrc == NULL ) {
364                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
365                 return 0;
366         }
367
368         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
369                 if( announce <= 0 ) {
370                         logit(  LOG_INFO, "waiting for RMR to show ready" );
371                         announce = 10;
372                 } else {
373                         announce--;
374                 }
375
376                 sleep( 1 );
377         }
378
379         return 1;
380 }
381
382 /*
383         Blocks until a message arives with a good return code or we timeout. Returns the
384         rmr message buffer. Timeout value epxected in seconds.
385 */
386 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
387         mcl_ctx_t*      ctx;
388
389         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
390                 return NULL;
391         }
392
393         if( ctx->mrc == NULL ) {
394                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
395                 exit( 1 );
396         }
397
398         do {
399                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
400         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
401
402         return msg;
403 }
404
405 /*
406         Create the context.
407 */
408 extern  void* mcl_mk_context( const char* dir ) {
409         mcl_ctx_t*      ctx;
410
411         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
412                 memset( ctx, 0, sizeof( *ctx ) );
413                 ctx->fifo_dir = strdup( dir );
414                 ctx->wr_hash = rmr_sym_alloc( 1001 );
415                 ctx->rd_hash = rmr_sym_alloc( 1001 );
416
417                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
418                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
419                         free( ctx );
420                         return NULL;
421                 }
422         }
423
424         return (void *) ctx;
425 }
426
427 /*
428         Read the header. Best case we read the expected number of bytes, get all
429         of them and find that they start with the delemiter.  Worst case
430         We have to wait for all of the header, or need to synch at the next
431         delimeter. We assume best case most likely and handle it as such.
432 */
433 static void read_header( int fd, char* buf ) {
434         size_t len;
435         size_t need = MCL_EXHDR_SIZE;           // total needed
436         size_t dneed;                                           // delimieter needed
437         char*   rp;                                                     // read position in buf
438
439         len = read( fd, buf, need );
440         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
441                 return;
442         }
443
444         while( TRUE ) {
445                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
446                         rp = buf + len;
447                         dneed = strlen( MCL_DELIM ) - len;
448
449                         while( dneed > 0 ) {
450                                 len = read( fd, rp, dneed );
451                                 dneed -= len;
452                                 rp += len;
453                         }
454                 }
455
456                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
457                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
458                         rp = buf + (MCL_EXHDR_SIZE - need);
459
460                         while( need > 0 ) {
461                                 len = read( fd, rp, need );
462                                 need -= len;
463                                 rp += len;
464                         }
465
466                         return;
467                 }
468
469                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
470                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
471                 }
472         }
473 }
474
475
476 /*
477         Read one record from the fifo that the message type maps to.
478         Writes at max ublen bytes into the ubuf.
479
480         If long_hdrs is true (!0), then we expect that the stream in the fifo
481         has extended headers (<delim><len><time>), and will write the timestamp
482         from the header into the buffer pointed to by timestamp. The buffer is
483         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
484
485         Further, when extended headers are being used, this function will
486         automatically resynchronise if it detects an issue.
487
488         The function could look for the delimiter and automatically detect whether
489         or not extended headers are in use, but if the stream is out of synch on the
490         first read, this cannot be done, so the funciton requires that the caller
491         know that the FIFO contains extended headers.
492 */
493 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
494         int fd;
495         int len;
496         int     msg_len;
497         int     got = 0;                                                // number of bytes we actually got
498         int need;
499         char wbuf[4096];
500         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
501
502         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
503                 errno = EINVAL;
504                 return 0;
505         }
506
507         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
508                 if( long_hdrs ) {
509                         read_header( fd, wbuf );
510                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
511                         if( timestamp ) {
512                                 strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
513                         }
514                 } else {
515                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
516                                 *timestamp = 0;
517                         }
518
519                         read( fd, wbuf, MCL_LEN_SIZE );                                 // we assume we will get all 8 bytes as there isn't a way to sync the old stream
520                         msg_len = need = atoi( wbuf );
521                 }
522
523
524                 if( need > ublen ) {
525                         need = ublen;                                           // cannot give them more than they can take
526                 }
527                 while( need > 0 ) {
528                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
529                         memcpy( ubuf+got, wbuf, len );
530                         got += len;
531                         need -= len;
532                 }
533
534                 if( msg_len > got ) {                                   // we must ditch rest of this message
535                         need = msg_len - got;
536                         while( need > 0 ) {
537                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
538                                 need -= len;
539                         }
540                 }
541
542                 return got;
543         }
544
545         errno = EBADFD;
546         return 0;
547 }
548
549 /*
550         Read one record from the fifo that the message type maps to.
551         Writes at max ublen bytes into the ubuf. If extended headers are in use
552         this function will ignore the timestamp.
553
554         If long_hdrs is true (!0), then we expect that the stream in the fifo
555         has extended headers (<delim><len><time>).
556 */
557 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
558         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
559 }
560
561 /*
562         Read a single message from the FIFO returning it in the caller's buffer. If extended
563         headers are being used, and the caller supplied a timestamp buffer, the timestamp
564         which was in the header will be returned in that buffer.  The return value is the number
565         of bytes in the buffer; 0 indicates an error and errno should be set.
566 */
567 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
568         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
569 }
570
571
572 /*
573         Will read messages and fan them out based on the message type. This should not
574         return and if it does the caller should assume an error.
575
576         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
577         string , followed by that number of 'raw' bytes. The raw bytes are the payload
578         exactly as received.
579
580         The report parameter is the frequency, in seconds, for writing a short
581         status report to stdout. If 0 then it's off.
582
583         If long_hdr is true, then we geneate an extended header with a delimiter and
584         timestamp.
585
586         The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
587         message.  When the health check message is received it is responded to
588         with the current state of processing (ok or err).
589 */
590 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
591         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
592         fifo_t*         fifo;                                   // fifo to chalk counts on
593         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
594         char            header[128];                    // header we'll pop in front of the payload
595         int                     fd;                                             // file des to write to
596         long long       total = 0;                              // total messages received and written
597         long long       total_drops = 0;                // total messages received and written
598         long            count = 0;                              // messages received and written during last reporting period
599         long            errors = 0;                             // unsuccessful payload writes
600         long            drops = 0;                              // number of drops
601         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
602         time_t          now;
603         size_t          hwlen;                                  // write len for header
604         void*           rdc_ctx = NULL;                 // raw data capture context
605         void*           rdc_buf = NULL;                 // capture buffer
606
607         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
608                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
609                 errno = EINVAL;
610                 return;
611         }
612
613         if( report < 0 ) {
614                 report = 0;
615         }
616
617         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
618
619         do {
620                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
621
622                 if( mbuf != NULL && mbuf->state == RMR_OK ) {
623                         if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
624                                 mbuf->mtype = RIC_HEALTH_CHECK_RESP;            // if we're here we are running and all is ok
625                                 mbuf->sub_id = -1;
626                                 mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE );  // ensure payload is large enough
627                                 if( mbuf->payload != NULL ) {
628                                         strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
629                                         rmr_rts_msg( ctx->mrc, mbuf );
630                                 }
631                                 continue;
632                         }
633
634                         if( mbuf->len > 0  ) {
635                                 if( long_hdr ) {
636                                         build_hdr( mbuf->len, header, sizeof( header ) );
637                                         hwlen = MCL_EXHDR_SIZE;
638                                 } else {
639                                         snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
640                                         hwlen = MCL_LEN_SIZE;
641                                 }
642
643                                 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
644                                 if( fd >= 0 ) {
645                                         if( write( fd, header, hwlen ) != hwlen ) {                     // write exactly MCL_LEN_SIZE bytes from the buffer
646                                                 drops++;
647                                                 total_drops++;
648                                                 chalk_error( fifo );
649                                         } else {
650                                                 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
651                                                         errors++;
652                                                         chalk_error( fifo );
653                                                 } else {
654                                                         chalk_ok( fifo );
655                                                         count++;
656                                                         total++;
657                                                 }
658                                         }
659                                 }
660
661                                 if( rdc_ctx != NULL ) {
662                                         rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
663                                         rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                // write the raw data
664                                 }
665                         }
666                 }
667
668                 if( report ) {
669                         if( (now = time( NULL ) ) > next_report ) {
670                                 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
671                                 fflush( stdout );
672
673                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
674                                         total, total_drops, report, count, drops, errors );
675                                 next_report = now + report;
676                                 count = 0;
677                                 drops = 0;
678                                 errors = 0;
679
680                                 fflush( stdout );
681                         }
682                 }
683
684                 if( ! FOREVER ) {                       // allow escape during unit tests; compiled out othewise, but sonar won't see that
685                         free( rdc_buf );
686                         break;                                  // sonar grumbles if we put FOREVER into the while; maddening
687                 }
688         } while( 1 );
689 }
690
691
692 /*
693         Given a buffer and length, along with the message type, look up the fifo and write
694         the buffer. Returns 0 on error; 1 on success.
695 */
696 extern int mcl_fifo_one( void* vctx, const char* payload, int plen, int mtype ) {
697         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
698         fifo_t*         fifo;                                   // fifo to chalk counts on
699         size_t          state = -1;
700         int                     fd;                                             // file des to write to
701
702         if( plen <= 0  || payload == NULL ) {
703                 return 1;
704         }
705
706         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
707                 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
708                 return 0;
709         }
710
711         fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
712         if( fd >= 0 ) {
713                 state = write( fd, payload, plen );
714         }
715
716         return state == (size_t) plen;
717 }