Add health check to the MC-listener application
[ric-app/mc.git] / sidecars / listener / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44 #include <rmr/RIC_message_types.h>
45
46 #include "mcl.h"
47
48 #ifndef FOREVER
49 #define FOREVER 1
50 #endif
51
52 #define READER 0
53 #define WRITER 1
54
55 #define TRUE    1
56 #define FALSE   0
57
58 /*
59         Information about one file descriptor. This is pointed to by the hash
60         such that the message type can be used as a key to look up the fifo's
61         file descriptor.
62 */
63 typedef struct {
64         int     fd;                                     // open fdes
65         int key;                                // symtab key
66         long long wcount;               // number of writes
67         long long drops;                // number dropped
68
69         long long wcount_rp;    // number of writes during last reporting period
70         long long drops_rp;             // number dropped during last reporting period
71 } fifo_t;
72
73 /*
74         Our conext.  Pointers to the read and write hash tables (both keyed on the message
75         type), the message router (RMR) context, and other goodies.
76 */
77 typedef struct {
78         void*   mrc;                            // the message router's context
79         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
80         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
81         char*   fifo_dir;                       // directory where we open fifos
82
83 } mcl_ctx_t;
84
85 // -------- private -------------------------------------------------------
86
87
88 /*
89         Set up for raw data capture. We look for directory overriedes from
90         environment variables, and then invoke the rdc_init() to actually
91         set things up.
92 */
93 static void* setup_rdc() {
94         void*   ctx;
95         int             value;                                                  // value computed for something
96         char*   ep;                                                             // pointer to environment var
97         char*   sdir = "/tmp/rdc/stage";                // default directory names
98         char*   fdir = "/tmp/rdc/final";
99         char*   suffix = ".rdc";
100         char*   done = NULL;
101
102         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
103                 if( ep != NULL  && atoi( ep ) == 0 ) {
104                         logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
105                         return NULL;
106                 }
107         }
108
109         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
110                 sdir = ep;
111         } else {
112                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
113                 mkdir( sdir, 0755 );
114         }
115
116         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
117                 fdir = ep;
118         } else {
119                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
120                 mkdir( fdir, 0755 );
121         }
122
123         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
124                 suffix = ep;
125         }
126
127         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
128                 done = ep;
129         }
130
131         ctx = rdc_init( sdir, fdir, suffix, done );
132         if( ctx == NULL ) {
133                 logit( LOG_ERR, "rdc_init did not generate a context" );
134         } else {
135                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
136                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
137         }
138
139         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
140                 value = atoi( ep );
141                 logit( LOG_INFO, "setting frequency: %d", value );
142                 rdc_set_freq( ctx, value );     
143         }
144         return ctx;
145 }
146
147 /*
148         Builds an extended header in the buffer provided, or allocates a new buffer if
149         dest is nil. The header is of the form:
150                 <delim><len><timestamp>
151
152         Field lengths (bytes) are:
153                 delim           4      
154                 len                     8       (7 digits + 0)
155                 timestamp       16  (15 digits + 0)
156
157         
158         Timestamp is a single unsigned long long in ASCII; ms since epoch.
159         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
160         the timestamp generated will be 1570113591103. 
161
162         The lenght and timestamp fields in the header are zero terminated so 
163         they can be parsed as a string (atoi etc).
164 */
165 static char* build_hdr( int len, char* dest, int dest_len ) {
166         struct timespec ts;         // time just before call executed
167
168         if( dest == NULL ) {
169                 dest_len = MCL_EXHDR_SIZE + 2;                  // more than enough room
170                 dest = (char *) malloc( sizeof( char ) * dest_len );
171         } else {
172                 if( dest_len < MCL_EXHDR_SIZE ) {               // shouldn't happen, but take no chances
173                         memset( dest, 0, dest_len );
174                         return NULL;
175                 }
176         }
177
178         memset( dest, 0, dest_len );
179
180         clock_gettime( CLOCK_REALTIME, &ts );
181         snprintf( dest, dest_len, "%s%07d", MCL_DELIM, len );
182         snprintf( dest+12, dest_len-13, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
183
184         return dest;
185 }
186
187 /*
188         Build a file name and open. The io_direction is either READER or
189         WRITER.  For a writer we must 'trick' the system into allowing us
190         to open a pipe for writing in non-blocking mode so that we can
191         report on drops (messages we couldn't write because there was no
192         reader).  The trick is to open a reader on the pipe so that when
193         we open the writer there's a reader and the open won't fail. As
194         soon as we have the writer open, we can close the junk reader.
195
196         If the desired fifo does not exist, it is created.
197 */
198 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
199         char    wbuf[1024];
200         int             fd;                                     // real file des
201         int             jfd = -1;                       // junk file des
202         int             state;
203
204         if( ctx == NULL || mtype < 0 ) {
205                 return -1;
206         }
207
208         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
209
210         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
211         if( state != 0 && errno != EEXIST ) {
212                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
213                 return -1;
214         }
215
216         if( io_dir == READER ) {
217                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
218                 if( fd < 0 ) {
219                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
220                 }
221         } else {
222                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
223                 if( jfd < 0 ) {
224                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
225                         return -1;
226                 }
227
228                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
229                 if( fd < 0 ) {
230                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
231                 }
232
233                 close( jfd );                   // should be safe to close this
234         }
235
236
237         return fd;
238 }
239
240 /*
241         Given a message type, return the file des of the fifo that
242         the payload should be written to.        Returns the file des, or -1
243         on error. When sussing out a read file descriptor this will
244         block until there is a fifo for the message type which is
245         open for reading.
246
247         If fref is not nil, then a pointer to the fifo info block is returned
248         allowing for direct update of counts after the write.
249 */
250 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
251         fifo_t* fifo = NULL;
252         void*   hash;
253
254         if( ctx == NULL ) {
255                 if( fref != NULL ) {
256                         *fref = NULL;
257                 }
258                 return -1;
259         }
260
261         if( io_dir == READER ) {                // with an integer key, we need two hash tables
262                 hash = ctx->rd_hash;
263         } else {
264                 hash = ctx->wr_hash;
265         }
266
267         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
268                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
269                 if( fifo != NULL ) {
270                         memset( fifo, 0, sizeof( *fifo ) );
271                         fifo->key = mtype;
272                         fifo->fd = open_fifo( ctx, mtype, io_dir );
273                         if( fifo->fd >= 0 ) {                                   // save only on good open
274                                 rmr_sym_map( hash, mtype, fifo );
275                         } else {
276                                 free( fifo );
277                                 fifo = NULL;
278                         }
279                 }
280         }
281
282         if( fref != NULL ) {
283                 *fref = fifo;
284         }
285
286         return fifo == NULL ? -1 : fifo->fd;
287 }
288
289 /*
290         Make marking counts easier in code
291 */
292 static inline void chalk_error( fifo_t* fifo ) {
293         if( fifo != NULL ) {
294                 fifo->drops++;
295                 fifo->drops_rp++;
296         }
297 }
298
299 static inline void chalk_ok( fifo_t* fifo ) {
300         if( fifo != NULL ) {
301                 fifo->wcount++;
302                 fifo->wcount_rp++;
303         }
304 }
305
306 /*
307         Callback function driven to traverse the symtab and generate the
308         counts for each fifo.
309 */
310 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
311         fifo_t* fifo;
312         int             report_period = 60;
313
314         if( data ) {
315                 report_period = *((int *) data);
316         }
317
318         if( (fifo = (fifo_t *) thing) != NULL ) {
319                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
320                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
321
322                 fifo->wcount_rp = 0;            // reset the report counts
323                 fifo->drops_rp = 0;
324         }
325 }
326
327 // ---------- public ------------------------------------------------------
328 /*
329         Sets a signal handler for sigpipe so we don't crash if a reader closes the
330         last reading fd on a pipe. We could do this automatically, but if the user
331         programme needs to trap sigpipe too, this gives them the option not to have
332         us interfere.
333 */
334 extern int mcl_set_sigh( ) {
335         signal( SIGPIPE, SIG_IGN );
336 }
337
338 /*
339         "Opens" the interface to RMR such that messages sent to the application will
340         be available via the rmr receive funcitons. This is NOT automatically called
341         by the mk_context function as some applications will be using the mc library
342         for non-RMR, fifo, chores.
343 */
344 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
345         mcl_ctx_t*      ctx;
346         int             announce = 0;
347
348         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
349                 return 0;
350         }
351
352         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
353         if( ctx->mrc == NULL ) {
354                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
355                 return 0;
356         }
357
358         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
359                 if( announce <= 0 ) {
360                         logit(  LOG_INFO, "waiting for RMR to show ready" );
361                         announce = 10;
362                 } else {
363                         announce--;
364                 }
365
366                 sleep( 1 );
367         }
368
369         return 1;
370 }
371
372 /*
373         Blocks until a message arives with a good return code or we timeout. Returns the
374         rmr message buffer. Timeout value epxected in seconds.
375 */
376 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
377         mcl_ctx_t*      ctx;
378
379         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
380                 return NULL;
381         }
382
383         if( ctx->mrc == NULL ) {
384                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
385                 exit( 1 );
386         }
387
388         do {
389                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
390         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
391
392         return msg;
393 }
394
395 /*
396         Create the context.
397 */
398 extern  void* mcl_mk_context( char* dir ) {
399         mcl_ctx_t*      ctx;
400
401         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
402                 memset( ctx, 0, sizeof( *ctx ) );
403                 ctx->fifo_dir = strdup( dir );
404                 ctx->wr_hash = rmr_sym_alloc( 1001 );
405                 ctx->rd_hash = rmr_sym_alloc( 1001 );
406
407                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
408                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
409                         free( ctx );
410                         return NULL;
411                 }
412         }
413
414         return (void *) ctx;
415 }
416
417 /*
418         Read the header. Best case we read the expected number of bytes, get all
419         of them and find that they start with the delemiter.  Worst case
420         We have to wait for all of the header, or need to synch at the next
421         delimeter. We assume best case most likely and handle it as such.
422 */
423 static void read_header( int fd, char* buf ) {
424         int len;
425         int need = MCL_EXHDR_SIZE;              // total needed
426         int dneed;                                              // delimieter needed
427         int     rlen;
428         char*   rp;                             // read position in buf
429
430         len = read( fd, buf, need );
431         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
432                 return;
433         }
434
435         while( TRUE ) {
436                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
437                         rp = buf + len;
438                         dneed = strlen( MCL_DELIM ) - len;
439
440                         while( dneed > 0 ) {
441                                 len = read( fd, rp, dneed );
442                                 dneed -= len;
443                                 rp += len;
444                         }
445                 }
446
447                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
448                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
449                         rp = buf + (MCL_EXHDR_SIZE - need);
450
451                         while( need > 0 ) {
452                                 len = read( fd, rp, need );
453                                 need -= len;
454                                 rp += len;
455                         }
456
457                         return;
458                 }
459
460                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
461                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
462                 }
463
464                 need = MCL_EXHDR_SIZE - len;
465         }
466 }
467
468
469 /*
470         Read one record from the fifo that the message type maps to.
471         Writes at max ublen bytes into the ubuf.
472
473         If long_hdrs is true (!0), then we expect that the stream in the fifo
474         has extended headers (<delim><len><time>), and will write the timestamp
475         from the header into the buffer pointed to by timestamp. The buffer is
476         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
477
478         Further, when extended headers are being used, this function will
479         automatically resynchronise if it detects an issue.
480
481         The function could look for the delimiter and automatically detect whether
482         or not extended headers are in use, but if the stream is out of synch on the
483         first read, this cannot be done, so the funciton requires that the caller
484         know that the FIFO contains extended headers.
485 */
486 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
487         int fd;
488         int len;
489         int     msg_len;
490         int     got = 0;                                                // number of bytes we actually got
491         int need;
492         char wbuf[4096];
493         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
494         fifo_t* fref = NULL;                            // the fifo struct we found
495
496         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
497                 errno = EINVAL;
498                 return 0;
499         }
500
501         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
502                 if( long_hdrs ) {
503                         read_header( fd, wbuf );
504                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
505                         if( timestamp ) {
506                                 strncpy( timestamp, wbuf + MCL_TSTAMP_OFF+1, MCL_TSTAMP_SIZE );
507                         }
508                 } else {
509                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
510                                 *timestamp = 0;
511                         }
512
513                         len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
514                         msg_len = need = atoi( wbuf );
515                 }
516
517
518                 if( need > ublen ) {
519                         need = ublen;                                           // cannot give them more than they can take
520                 }
521                 while( need > 0 ) {
522                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
523                         memcpy( ubuf+got, wbuf, len );
524                         got += len;
525                         need -= len;
526                 }
527
528                 if( msg_len > got ) {                                   // we must ditch rest of this message
529                         need = msg_len = got;
530                         while( need > 0 ) {
531                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
532                                 need -= len;
533                         }
534                 }
535
536                 return got;
537         }
538
539         errno = EBADFD;
540         return 0;
541 }
542
543 /*
544         Read one record from the fifo that the message type maps to.
545         Writes at max ublen bytes into the ubuf. If extended headers are in use
546         this function will ignore the timestamp.
547
548         If long_hdrs is true (!0), then we expect that the stream in the fifo
549         has extended headers (<delim><len><time>).
550 */
551 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
552         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
553 }
554
555 /*
556         Read a single message from the FIFO returning it in the caller's buffer. If extended
557         headers are being used, and the caller supplied a timestamp buffer, the timestamp
558         which was in the header will be returned in that buffer.  The return value is the number
559         of bytes in the buffer; 0 indicates an error and errno should be set.
560 */
561 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
562         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
563 }
564
565
566 /*
567         Will read messages and fan them out based on the message type. This should not
568         return and if it does the caller should assume an error.
569
570         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
571         string , followed by that number of 'raw' bytes. The raw bytes are the payload
572         exactly as received.
573
574         The report parameter is the frequency, in seconds, for writing a short
575         status report to stdout. If 0 then it's off.
576
577         If long_hdr is true, then we geneate an extended header with a delimiter and
578         timestamp.
579
580         The one message which is NOT pushed into a FIFO is the RIC_HEALTH_CHECK_REQ
581         message.  When the health check message is received it is responded to 
582         with the current state of processing (ok or err).
583 */
584 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
585         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
586         fifo_t*         fifo;                                   // fifo to chalk counts on
587         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
588         char            header[128];                    // header we'll pop in front of the payload
589         int                     state;
590         int                     fd;                                             // file des to write to
591         long long       total = 0;                              // total messages received and written
592         long long       total_drops = 0;                // total messages received and written
593         long            count = 0;                              // messages received and written during last reporting period
594         long            errors = 0;                             // unsuccessful payload writes
595         long            drops;                                  // number of drops
596         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
597         time_t          now;
598         int                     hwlen;                                  // write len for header
599         void*           rdc_ctx = NULL;                 // raw data capture context
600         void*           rdc_buf = NULL;                 // capture buffer
601
602         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
603                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
604                 errno = EINVAL;
605                 return;
606         }
607
608         if( report < 0 ) {
609                 report = 0;
610         }
611
612         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
613
614         do {
615                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
616
617                 if( mbuf != NULL && mbuf->state == RMR_OK ) {
618                         if( mbuf->mtype == RIC_HEALTH_CHECK_REQ ) {
619                                 mbuf->mtype = RIC_HEALTH_CHECK_RESP;            // if we're here we are running and all is ok
620                                 mbuf->sub_id = -1;
621                                 mbuf = rmr_realloc_payload( mbuf, 128, FALSE, FALSE );  // ensure payload is large enough
622                                 strncpy( mbuf->payload, "OK\n", rmr_payload_size( mbuf) );
623                                 rmr_rts_msg( ctx->mrc, mbuf );
624                                 continue;
625                         }
626                         
627                         if( mbuf->len > 0  ) {
628                                 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
629                                 if( fd >= 0 ) {
630                                         if( long_hdr ) {
631                                                 build_hdr( mbuf->len, header, sizeof( header ) );
632                                                 hwlen = MCL_EXHDR_SIZE;
633                                         } else {
634                                                 snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
635                                                 hwlen = MCL_LEN_SIZE;
636                                         }
637
638                                         if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
639                                                 drops++;
640                                                 total_drops++;
641                                                 chalk_error( fifo );
642                                         } else {
643                                                 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
644                                                         errors++;
645                                                         chalk_error( fifo );
646                                                 } else {
647                                                         chalk_ok( fifo );
648                                                         count++;
649                                                         total++;
650                                                 }
651                                         }
652                                 }
653
654                                 if( rdc_ctx != NULL ) {
655                                         rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
656                                         rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
657                                 }
658                         }
659                 }
660
661                 if( report ) {
662                         if( (now = time( NULL ) ) > next_report ) {
663                         rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
664                                 fflush( stdout );
665
666                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
667                                         total, total_drops, report, count, drops, errors );
668                                 next_report = now + report;
669                                 count = 0;
670                                 drops = 0;
671
672                                 fflush( stdout );
673                         }
674                 }
675         } while( FOREVER );                             // forever allows for escape during unit testing
676 }
677
678
679 /*
680         Given a buffer and length, along with the message type, look up the fifo and write
681         the buffer. Returns 0 on error; 1 on success.
682 */
683 extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
684         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
685         fifo_t*         fifo;                                   // fifo to chalk counts on
686         int                     state = -1;
687         int                     fd;                                             // file des to write to
688
689         if( plen <= 0 ) {
690                 return 1;
691         }
692
693         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
694                 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
695                 return 0;
696         }
697
698         fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
699         if( fd >= 0 ) {
700                 state = write( fd, payload, plen );
701         } 
702
703         return state == plen;
704 }