262486b04a5253c035c296dad2ded2c71b1019a0
[ric-app/mc.git] / sidecars / listener / mcl.c
1 // vim: ts=4 sw=4 noet:
2 /*
3 --------------------------------------------------------------------------------
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10            http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 --------------------------------------------------------------------------------
18 */
19
20 /*
21         Mnemonic:       mcl.c.
22         Abstract:       The mc listener library content. All external functions
23                                 should start with mcl_ and all stderr messages should have
24                                 (mcl) as the first token following the severity indicator.
25
26         Date:           22 August 2019
27         Author:         E. Scott Daniels
28 */
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <time.h>
35 #include <string.h>
36 #include <fcntl.h>
37 #include <signal.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40
41
42 #include <rmr/rmr.h>
43 #include <rmr/rmr_symtab.h>
44
45 #include "mcl.h"
46
47 #define READER 0
48 #define WRITER 1
49
50 #define TRUE    1
51 #define FALSE   0
52
53 /*
54         Information about one file descriptor. This is pointed to by the hash
55         such that the message type can be used as a key to look up the fifo's
56         file descriptor.
57 */
58 typedef struct {
59         int     fd;                                     // open fdes
60         int key;                                // symtab key
61         long long wcount;               // number of writes
62         long long drops;                // number dropped
63
64         long long wcount_rp;    // number of writes during last reporting period
65         long long drops_rp;             // number dropped during last reporting period
66 } fifo_t;
67
68 /*
69         Our conext.  Pointers to the read and write hash tables (both keyed on the message
70         type), the message router (RMR) context, and other goodies.
71 */
72 typedef struct {
73         void*   mrc;                            // the message router's context
74         void*   wr_hash;                        // symtable to look up pipe info based on mt for writing
75         void*   rd_hash;                        // we support reading from pipes, but need a different FD for that
76         char*   fifo_dir;                       // directory where we open fifos
77
78 } mcl_ctx_t;
79
80 // -------- private -------------------------------------------------------
81
82
83 /*
84         Set up for raw data capture. We look for directory overriedes from
85         environment variables, and then invoke the rdc_init() to actually
86         set things upd.
87 */
88 static void* setup_rdc() {
89         void*   ctx;
90         int             value;                                                  // value computed for something
91         char*   ep;                                                             // pointer to environment var
92         char*   sdir = "/tmp/rdc/stage";                // default directory names
93         char*   fdir = "/tmp/rdc/final";
94         char*   suffix = ".rdc";
95         char*   done = NULL;
96
97         if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
98                 if( ep != NULL  && atoi( ep ) == 0 ) {
99                         logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
100                         return NULL;
101                 }
102         }
103
104         if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
105                 sdir = ep;
106         } else {
107                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures here as it could likely exist
108                 mkdir( sdir, 0755 );
109         }
110
111         if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
112                 fdir = ep;
113         } else {
114                 mkdir( "/tmp/rdc", 0755 );                      // we ignore failures again -- very likely it's there
115                 mkdir( fdir, 0755 );
116         }
117
118         if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
119                 suffix = ep;
120         }
121
122         if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
123                 done = ep;
124         }
125
126         ctx = rdc_init( sdir, fdir, suffix, done );
127         if( ctx == NULL ) {
128                 logit( LOG_ERR, "rdc_init did not generate a context" );
129         } else {
130                 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
131                 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
132         }
133
134         if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
135                 value = atoi( ep );
136                 logit( LOG_INFO, "setting frequency: %d", value );
137                 rdc_set_freq( ctx, value );     
138         }
139         return ctx;
140 }
141
142 /*
143         Builds an extended header in the buffer provided, or allocates a new buffer if
144         dest is nil. The header is of the form:
145                 <delim><len><timestamp>
146
147         Timestamp is a single unsigned long long in ASCII; ms since epoch.
148         If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
149         the timestamp generated will be 1570113591103.
150 */
151 static char* build_hdr( int len, char* dest, int dest_len ) {
152         struct timespec ts;         // time just before call executed
153
154         if( dest == NULL ) {
155                 dest_len = 48;
156                 dest = (char *) malloc( sizeof( char ) * dest_len );
157         } else {
158                 if( dest_len < 28 ) {           // shouldn't happen, but take no chances
159                         memset( dest, 0, dest_len );
160                         return NULL;
161                 }
162         }
163
164         memset( dest, 0, dest_len );
165
166         clock_gettime( CLOCK_REALTIME, &ts );
167         sprintf( dest, "%s%07d", MCL_DELIM, len );
168         sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
169
170         return dest;
171 }
172
173 /*
174         Build a file name and open. The io_direction is either READER or
175         WRITER.  For a writer we must 'trick' the system into allowing us
176         to open a pipe for writing in non-blocking mode so that we can
177         report on drops (messages we couldn't write because there was no
178         reader).  The trick is to open a reader on the pipe so that when
179         we open the writer there's a reader and the open won't fail. As
180         soon as we have the writer open, we can close the junk reader.
181
182         If the desired fifo does not exist, it is created.
183 */
184 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
185         char    wbuf[1024];
186         int             fd;                             // real file des
187         int             jfd = -1;                       // junk file des
188         int             state;
189
190         if( ctx == NULL || mtype < 0 ) {
191                 return -1;
192         }
193
194         snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
195
196         state = mkfifo( wbuf, 0660 );           // make the fifo; this will fail if it exists and that's ok
197         if( state != 0 && errno != EEXIST ) {
198                 logit(  LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
199                 return -1;
200         }
201
202         if( io_dir == READER ) {
203                 fd = open( wbuf, O_RDONLY  );                   // just open the reader
204                 if( fd < 0 ) {
205                         logit(  LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
206                 }
207         } else {
208                 jfd = open( wbuf, O_RDWR  | O_NONBLOCK );                       // must have a reader before we can open a non-blocking writer
209                 if( jfd < 0 ) {
210                         logit(  LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
211                         return -1;
212                 }
213
214                 fd = open( wbuf, O_WRONLY  | O_NONBLOCK );                      // this will be our actual writer, in non-blocking mode
215                 if( fd < 0 ) {
216                         logit(  LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
217                 }
218
219                 close( jfd );                   // should be safe to close this
220         }
221
222
223         return fd;
224 }
225
226 /*
227         Given a message type, return the file des of the fifo that
228         the payload should be written to.        Returns the file des, or -1
229         on error. When sussing out a read file descriptor this will
230         block until there is a fifo for the message type which is
231         open for reading.
232
233         If fref is not nil, then a pointer to the fifo info block is returned
234         allowing for direct update of counts after the write.
235 */
236 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
237         fifo_t* fifo;
238         void*   hash;
239
240         if( io_dir == READER ) {                // with an integer key, we nned two hash tables
241                 hash = ctx->rd_hash;
242         } else {
243                 hash = ctx->wr_hash;
244         }
245
246         if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
247                 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
248                 if( fifo == NULL ) {
249                         return -1;
250                 }
251
252                 memset( fifo, 0, sizeof( *fifo ) );
253                 fifo->key = mtype;
254                 fifo->fd = open_fifo( ctx, mtype, io_dir );
255                 rmr_sym_map( hash, mtype, fifo );
256         }
257
258         if( fref != NULL ) {
259                 *fref = fifo;
260         }
261         return fifo->fd;
262 }
263
264 /*
265         Make marking counts easier in code
266 */
267 static inline void chalk_error( fifo_t* fifo ) {
268         if( fifo != NULL ) {
269                 fifo->drops++;
270                 fifo->drops_rp++;
271         }
272 }
273
274 static inline void chalk_ok( fifo_t* fifo ) {
275         if( fifo != NULL ) {
276                 fifo->wcount++;
277                 fifo->wcount_rp++;
278         }
279 }
280
281 /*
282         Callback function driven to traverse the symtab and generate the
283         counts for each fifo.
284 */
285 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
286         fifo_t* fifo;
287         int             report_period = 60;
288
289         if( data ) {
290                 report_period = *((int *) data);
291         }
292
293         if( (fifo = (fifo_t *) thing) != NULL ) {
294                 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
295                         fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
296
297                 fifo->wcount_rp = 0;            // reset the report counts
298                 fifo->drops_rp = 0;
299         }
300 }
301
302 // ---------- public ------------------------------------------------------
303 /*
304         Sets a signal handler for sigpipe so we don't crash if a reader closes the
305         last reading fd on a pipe. We could do this automatically, but if the user
306         programme needs to trap sigpipe too, this gives them the option not to have
307         us interfere.
308 */
309 extern int mcl_set_sigh( ) {
310         signal( SIGPIPE, SIG_IGN );
311 }
312
313 /*
314         "Opens" the interface to RMR such that messages sent to the application will
315         be available via the rmr receive funcitons. This is NOT automatically called
316         by the mk_context function as some applications will be using the mc library
317         for non-RMR, fifo, chores.
318 */
319 extern int mcl_start_listening( void* vctx,  char* port, int wait4ready ) {
320         mcl_ctx_t*      ctx;
321         int             announce = 0;
322
323         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
324                 return 0;
325         }
326
327         ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE );     // start your engines!
328         if( ctx->mrc == NULL ) {
329                 logit(  LOG_CRIT, "start listening: unable to initialise RMr" );
330                 return 0;
331         }
332
333         while( wait4ready && ! rmr_ready( ctx->mrc ) ) {                                // only senders need to wait
334                 if( announce <= 0 ) {
335                         logit(  LOG_INFO, "waiting for RMR to show ready" );
336                         announce = 10;
337                 } else {
338                         announce--;
339                 }
340
341                 sleep( 1 );
342         }
343
344         return 1;
345 }
346
347 /*
348         Blocks until a message arives with a good return code or we timeout. Returns the
349         rmr message buffer. Timeout value epxected in seconds.
350 */
351 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
352         mcl_ctx_t*      ctx;
353
354         if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
355                 return NULL;
356         }
357
358         if( ctx->mrc == NULL ) {
359                 logit(  LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
360                 exit( 1 );
361         }
362
363         do {
364                 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 );                           // wait for next
365         } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
366
367         return msg;
368 }
369
370 /*
371         Create the context.
372 */
373 extern  void* mcl_mk_context( char* dir ) {
374         mcl_ctx_t*      ctx;
375
376         if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
377                 memset( ctx, 0, sizeof( *ctx ) );
378                 ctx->fifo_dir = strdup( dir );
379                 ctx->wr_hash = rmr_sym_alloc( 1001 );
380                 ctx->rd_hash = rmr_sym_alloc( 1001 );
381
382                 if( ctx->wr_hash == NULL  || ctx->rd_hash == NULL ) {
383                         logit(  LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
384                         free( ctx );
385                         return NULL;
386                 }
387         }
388
389         return (void *) ctx;
390 }
391
392 /*
393         Read the header. Best case we read the expected number of bytes, get all
394         of them and find that they start with the delemiter.  Worst case
395         We have to wait for all of the header, or need to synch at the next
396         delimeter. We assume best case most likely and handle it as such.
397 */
398 static void read_header( int fd, char* buf ) {
399         int len;
400         int need = MCL_EXHDR_SIZE;              // total needed
401         int dneed;                                              // delimieter needed
402         int     rlen;
403         char*   rp;                             // read position in buf
404
405         len = read( fd, buf, need );
406         if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {       // best case, most likely
407                 return;
408         }
409
410         while( TRUE ) {
411                 if( len < strlen( MCL_DELIM ) ) {               // must get at least enough bytes to check delim
412                         rp = buf + len;
413                         dneed = strlen( MCL_DELIM ) - len;
414
415                         while( dneed > 0 ) {
416                                 len = read( fd, rp, dneed );
417                                 dneed -= len;
418                                 rp += len;
419                         }
420                 }
421
422                 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) {      // have a good delimiter, just need to wait for bytes
423                         need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
424                         rp = buf + (MCL_EXHDR_SIZE - need);
425
426                         while( need > 0 ) {
427                                 len = read( fd, rp, need );
428                                 need -= len;
429                                 rp += len;
430                         }
431
432                         return;
433                 }
434
435                 while( buf[0] != MCL_DELIM[0] ) {       // wait for a recognised start byte to be read (may cause an additional message drop
436                         len = read( fd, buf, 1 );               // because we ignore start byte that might be in the buffer)
437                 }
438
439                 need = MCL_EXHDR_SIZE - len;
440         }
441 }
442
443
444 /*
445         Read one record from the fifo that the message type maps to.
446         Writes at max ublen bytes into the ubuf.
447
448         If long_hdrs is true (!0), then we expect that the stream in the fifo
449         has extended headers (<delim><len><time>), and will write the timestamp
450         from the header into the buffer pointed to by timestamp. The buffer is
451         assumed to be at least MCL_TSTAMP_SIZE bytes in length.
452
453         Further, when extended headers are being used, this function will
454         automatically resynchronise if it detects an issue.
455
456         The function could look for the delimiter and automatically detect whether
457         or not extended headers are in use, but if the stream is out of synch on the
458         first read, this cannot be done, so the funciton requires that the caller
459         know that the FIFO contains extended headers.
460 */
461 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
462         int fd;
463         int len;
464         int     msg_len;
465         int     got = 0;                                                // number of bytes we actually got
466         int need;
467         char wbuf[4096];
468         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
469         fifo_t* fref = NULL;                            // the fifo struct we found
470
471         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
472                 errno = EINVAL;
473                 return 0;
474         }
475
476         if( (fd = suss_fifo( ctx, mtype, READER, NULL ))  >= 0 )  {
477                 if( long_hdrs ) {
478                         read_header( fd, wbuf );
479                         msg_len = need = atoi( wbuf + MCL_LEN_OFF );                            // read the length
480                         if( timestamp ) {
481                                 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
482                         }
483                 } else {
484                         if( timestamp != NULL ) {                                               // won't be there, but ensure it's not garbage
485                                 *timestamp = 0;
486                         }
487
488                         len = read( fd, wbuf, MCL_LEN_SIZE );                   // we assume we will get all 8 as there isn't a way to sync the old stream
489                         msg_len = need = atoi( wbuf );
490                 }
491
492
493                 if( need > ublen ) {
494                         need = ublen;                                           // cannot give them more than they can take
495                 }
496                 while( need > 0 ) {
497                         len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
498                         memcpy( ubuf+got, wbuf, len );
499                         got += len;
500                         need -= len;
501                 }
502
503                 if( msg_len > got ) {                                   // we must ditch rest of this message
504                         need = msg_len = got;
505                         while( need > 0 ) {
506                                 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
507                                 need -= len;
508                         }
509                 }
510
511                 return got;
512         }
513
514         errno = EBADFD;
515         return 0;
516 }
517
518 /*
519         Read one record from the fifo that the message type maps to.
520         Writes at max ublen bytes into the ubuf. If extended headers are in use
521         this function will ignore the timestamp.
522
523         If long_hdrs is true (!0), then we expect that the stream in the fifo
524         has extended headers (<delim><len><time>).
525 */
526 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
527         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
528 }
529
530 /*
531         Read a single message from the FIFO returning it in the caller's buffer. If extended
532         headers are being used, and the caller supplied a timestamp buffer, the timestamp
533         which was in the header will be returned in that buffer.  The return value is the number
534         of bytes in the buffer; 0 indicates an error and errno should be set.
535 */
536 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
537         return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
538 }
539
540
541 /*
542         Will read messages and fan them out based on the message type. This should not
543         return and if it does the caller should assume an error.
544
545         The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
546         string , followed by that number of 'raw' bytes. The raw bytes are the payload
547         exactly as received.
548
549         The report parameter is the frequency, in seconds, for writing a short
550         status report to stdout. If 0 then it's off.
551
552         If long_hdr is true, then we geneate an extended header with a delimiter and
553         timestamp.
554 */
555 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr  ) {
556         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
557         fifo_t*         fifo;                                   // fifo to chalk counts on
558         rmr_mbuf_t*     mbuf = NULL;                    // received message buffer; recycled on each call
559         char            header[128];                    // header we'll pop in front of the payload
560         int                     state;
561         int                     fd;                                             // file des to write to
562         long long       total = 0;                              // total messages received and written
563         long long       total_drops = 0;                // total messages received and written
564         long            count = 0;                              // messages received and written during last reporting period
565         long            errors = 0;                             // unsuccessful payload writes
566         long            drops;                                  // number of drops
567         time_t          next_report = 0;                // we'll report every 2 seconds if report is true
568         time_t          now;
569         int                     hwlen;                                  // write len for header
570         void*           rdc_ctx = NULL;                 // raw data capture context
571         void*           rdc_buf = NULL;                 // capture buffer
572
573         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
574                 logit(  LOG_ERR, "(mcl) invalid context given to fanout" );
575                 errno = EINVAL;
576                 return;
577         }
578
579         if( report < 0 ) {
580                 report = 0;
581         }
582
583         rdc_ctx = setup_rdc( );                         // pull rdc directories from enviornment and initialise
584
585         while( 1 ) {
586                 mbuf = mcl_get_msg( ctx, mbuf, report );                        // wait up to report sec for msg (0 == block until message)
587
588                 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0  ) {
589                         fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo );              // map the message type to an open fd
590                         if( fd >= 0 ) {
591                                 if( long_hdr ) {
592                                         build_hdr( mbuf->len, header, sizeof( header ) );
593                                         hwlen = MCL_EXHDR_SIZE;
594                                 } else {
595                                         snprintf( header, sizeof( header ), "%07d", mbuf->len );                        // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
596                                         hwlen = MCL_LEN_SIZE;
597                                 }
598
599                                 if( (state = write( fd, header, hwlen )) != hwlen ) {           // write exactly MCL_LEN_SIZE bytes from the buffer
600                                         drops++;
601                                         total_drops++;
602                                         chalk_error( fifo );
603                                 } else {
604                                         if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) {                       // followed by the payload
605                                                 errors++;
606                                                 chalk_error( fifo );
607                                         } else {
608                                                 chalk_ok( fifo );
609                                                 count++;
610                                                 total++;
611                                         }
612                                 }
613                         }
614
615                         if( rdc_ctx != NULL ) {
616                                 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf );                  // set up for write
617                                 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len );                                                // write the raw data
618                         }
619                 }
620
621                 if( report ) {
622                         if( (now = time( NULL ) ) > next_report ) {
623                         rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report );        // run endpoints in the active table
624                                 fflush( stdout );
625
626                                 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
627                                         total, total_drops, report, count, drops, errors );
628                                 next_report = now + report;
629                                 count = 0;
630                                 drops = 0;
631
632                                 fflush( stdout );
633                         }
634                 }
635         }
636 }
637
638
639 /*
640         Given a buffer and length, along with the message type, look up the fifo and write
641         the buffer. Returns 0 on error; 1 on success.
642 */
643 extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
644         mcl_ctx_t*      ctx;                                    // our context; mostly for the rmr context reference and symtable
645         fifo_t*         fifo;                                   // fifo to chalk counts on
646         int                     state = -1;
647         int                     fd;                                             // file des to write to
648
649         if( plen <= 0 ) {
650                 return 1;
651         }
652
653         if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
654                 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
655                 return 0;
656         }
657
658         fd = suss_fifo( ctx, mtype, WRITER, &fifo );            // map the message type to an open fd
659         if( fd >= 0 ) {
660                 state = write( fd, payload, plen );
661         } 
662
663         return state == plen;
664 }