1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: The mc listener library content. All external functions
23 should start with mcl_ and all stderr messages should have
24 (mcl) as the first token following the severity indicator.
27 Author: E. Scott Daniels
39 #include <sys/types.h>
43 #include <rmr/rmr_symtab.h>
58 Information about one file descriptor. This is pointed to by the hash
59 such that the message type can be used as a key to look up the fifo's
64 int key; // symtab key
65 long long wcount; // number of writes
66 long long drops; // number dropped
68 long long wcount_rp; // number of writes during last reporting period
69 long long drops_rp; // number dropped during last reporting period
73 Our conext. Pointers to the read and write hash tables (both keyed on the message
74 type), the message router (RMR) context, and other goodies.
77 void* mrc; // the message router's context
78 void* wr_hash; // symtable to look up pipe info based on mt for writing
79 void* rd_hash; // we support reading from pipes, but need a different FD for that
80 char* fifo_dir; // directory where we open fifos
84 // -------- private -------------------------------------------------------
88 Set up for raw data capture. We look for directory overriedes from
89 environment variables, and then invoke the rdc_init() to actually
92 static void* setup_rdc() {
94 int value; // value computed for something
95 char* ep; // pointer to environment var
96 char* sdir = "/tmp/rdc/stage"; // default directory names
97 char* fdir = "/tmp/rdc/final";
98 char* suffix = ".rdc";
101 if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
102 if( ep != NULL && atoi( ep ) == 0 ) {
103 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
108 if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
111 mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist
115 if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
118 mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there
122 if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
126 if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
130 ctx = rdc_init( sdir, fdir, suffix, done );
132 logit( LOG_ERR, "rdc_init did not generate a context" );
134 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
135 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
138 if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
140 logit( LOG_INFO, "setting frequency: %d", value );
141 rdc_set_freq( ctx, value );
147 Builds an extended header in the buffer provided, or allocates a new buffer if
148 dest is nil. The header is of the form:
149 <delim><len><timestamp>
151 Timestamp is a single unsigned long long in ASCII; ms since epoch.
152 If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
153 the timestamp generated will be 1570113591103.
155 static char* build_hdr( int len, char* dest, int dest_len ) {
156 struct timespec ts; // time just before call executed
160 dest = (char *) malloc( sizeof( char ) * dest_len );
162 if( dest_len < 28 ) { // shouldn't happen, but take no chances
163 memset( dest, 0, dest_len );
168 memset( dest, 0, dest_len );
170 clock_gettime( CLOCK_REALTIME, &ts );
171 sprintf( dest, "%s%07d", MCL_DELIM, len );
172 sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
178 Build a file name and open. The io_direction is either READER or
179 WRITER. For a writer we must 'trick' the system into allowing us
180 to open a pipe for writing in non-blocking mode so that we can
181 report on drops (messages we couldn't write because there was no
182 reader). The trick is to open a reader on the pipe so that when
183 we open the writer there's a reader and the open won't fail. As
184 soon as we have the writer open, we can close the junk reader.
186 If the desired fifo does not exist, it is created.
188 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
190 int fd; // real file des
191 int jfd = -1; // junk file des
194 if( ctx == NULL || mtype < 0 ) {
198 snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
200 state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
201 if( state != 0 && errno != EEXIST ) {
202 logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
206 if( io_dir == READER ) {
207 fd = open( wbuf, O_RDONLY ); // just open the reader
209 logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
212 jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
214 logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
218 fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
220 logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
223 close( jfd ); // should be safe to close this
231 Given a message type, return the file des of the fifo that
232 the payload should be written to. Returns the file des, or -1
233 on error. When sussing out a read file descriptor this will
234 block until there is a fifo for the message type which is
237 If fref is not nil, then a pointer to the fifo info block is returned
238 allowing for direct update of counts after the write.
240 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
251 if( io_dir == READER ) { // with an integer key, we need two hash tables
257 if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
258 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
260 memset( fifo, 0, sizeof( *fifo ) );
262 fifo->fd = open_fifo( ctx, mtype, io_dir );
263 if( fifo->fd >= 0 ) { // save only on good open
264 rmr_sym_map( hash, mtype, fifo );
276 return fifo == NULL ? -1 : fifo->fd;
280 Make marking counts easier in code
282 static inline void chalk_error( fifo_t* fifo ) {
289 static inline void chalk_ok( fifo_t* fifo ) {
297 Callback function driven to traverse the symtab and generate the
298 counts for each fifo.
300 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
302 int report_period = 60;
305 report_period = *((int *) data);
308 if( (fifo = (fifo_t *) thing) != NULL ) {
309 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
310 fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
312 fifo->wcount_rp = 0; // reset the report counts
317 // ---------- public ------------------------------------------------------
319 Sets a signal handler for sigpipe so we don't crash if a reader closes the
320 last reading fd on a pipe. We could do this automatically, but if the user
321 programme needs to trap sigpipe too, this gives them the option not to have
324 extern int mcl_set_sigh( ) {
325 signal( SIGPIPE, SIG_IGN );
329 "Opens" the interface to RMR such that messages sent to the application will
330 be available via the rmr receive funcitons. This is NOT automatically called
331 by the mk_context function as some applications will be using the mc library
332 for non-RMR, fifo, chores.
334 extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) {
338 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
342 ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
343 if( ctx->mrc == NULL ) {
344 logit( LOG_CRIT, "start listening: unable to initialise RMr" );
348 while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
349 if( announce <= 0 ) {
350 logit( LOG_INFO, "waiting for RMR to show ready" );
363 Blocks until a message arives with a good return code or we timeout. Returns the
364 rmr message buffer. Timeout value epxected in seconds.
366 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
369 if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
373 if( ctx->mrc == NULL ) {
374 logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
379 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next
380 } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
388 extern void* mcl_mk_context( char* dir ) {
391 if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
392 memset( ctx, 0, sizeof( *ctx ) );
393 ctx->fifo_dir = strdup( dir );
394 ctx->wr_hash = rmr_sym_alloc( 1001 );
395 ctx->rd_hash = rmr_sym_alloc( 1001 );
397 if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
398 logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
408 Read the header. Best case we read the expected number of bytes, get all
409 of them and find that they start with the delemiter. Worst case
410 We have to wait for all of the header, or need to synch at the next
411 delimeter. We assume best case most likely and handle it as such.
413 static void read_header( int fd, char* buf ) {
415 int need = MCL_EXHDR_SIZE; // total needed
416 int dneed; // delimieter needed
418 char* rp; // read position in buf
420 len = read( fd, buf, need );
421 if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
426 if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
428 dneed = strlen( MCL_DELIM ) - len;
431 len = read( fd, rp, dneed );
437 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
438 need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
439 rp = buf + (MCL_EXHDR_SIZE - need);
442 len = read( fd, rp, need );
450 while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
451 len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
454 need = MCL_EXHDR_SIZE - len;
460 Read one record from the fifo that the message type maps to.
461 Writes at max ublen bytes into the ubuf.
463 If long_hdrs is true (!0), then we expect that the stream in the fifo
464 has extended headers (<delim><len><time>), and will write the timestamp
465 from the header into the buffer pointed to by timestamp. The buffer is
466 assumed to be at least MCL_TSTAMP_SIZE bytes in length.
468 Further, when extended headers are being used, this function will
469 automatically resynchronise if it detects an issue.
471 The function could look for the delimiter and automatically detect whether
472 or not extended headers are in use, but if the stream is out of synch on the
473 first read, this cannot be done, so the funciton requires that the caller
474 know that the FIFO contains extended headers.
476 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
480 int got = 0; // number of bytes we actually got
483 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
484 fifo_t* fref = NULL; // the fifo struct we found
486 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
491 if( (fd = suss_fifo( ctx, mtype, READER, NULL )) >= 0 ) {
493 read_header( fd, wbuf );
494 msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
496 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
499 if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
503 len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
504 msg_len = need = atoi( wbuf );
509 need = ublen; // cannot give them more than they can take
512 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
513 memcpy( ubuf+got, wbuf, len );
518 if( msg_len > got ) { // we must ditch rest of this message
519 need = msg_len = got;
521 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
534 Read one record from the fifo that the message type maps to.
535 Writes at max ublen bytes into the ubuf. If extended headers are in use
536 this function will ignore the timestamp.
538 If long_hdrs is true (!0), then we expect that the stream in the fifo
539 has extended headers (<delim><len><time>).
541 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
542 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
546 Read a single message from the FIFO returning it in the caller's buffer. If extended
547 headers are being used, and the caller supplied a timestamp buffer, the timestamp
548 which was in the header will be returned in that buffer. The return value is the number
549 of bytes in the buffer; 0 indicates an error and errno should be set.
551 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
552 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
557 Will read messages and fan them out based on the message type. This should not
558 return and if it does the caller should assume an error.
560 The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
561 string , followed by that number of 'raw' bytes. The raw bytes are the payload
564 The report parameter is the frequency, in seconds, for writing a short
565 status report to stdout. If 0 then it's off.
567 If long_hdr is true, then we geneate an extended header with a delimiter and
570 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
571 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
572 fifo_t* fifo; // fifo to chalk counts on
573 rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
574 char header[128]; // header we'll pop in front of the payload
576 int fd; // file des to write to
577 long long total = 0; // total messages received and written
578 long long total_drops = 0; // total messages received and written
579 long count = 0; // messages received and written during last reporting period
580 long errors = 0; // unsuccessful payload writes
581 long drops; // number of drops
582 time_t next_report = 0; // we'll report every 2 seconds if report is true
584 int hwlen; // write len for header
585 void* rdc_ctx = NULL; // raw data capture context
586 void* rdc_buf = NULL; // capture buffer
588 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
589 logit( LOG_ERR, "(mcl) invalid context given to fanout" );
598 rdc_ctx = setup_rdc( ); // pull rdc directories from enviornment and initialise
601 mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
603 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0 ) {
604 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
607 build_hdr( mbuf->len, header, sizeof( header ) );
608 hwlen = MCL_EXHDR_SIZE;
610 snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
611 hwlen = MCL_LEN_SIZE;
614 if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
619 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
630 if( rdc_ctx != NULL ) {
631 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
632 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
637 if( (now = time( NULL ) ) > next_report ) {
638 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
641 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
642 total, total_drops, report, count, drops, errors );
643 next_report = now + report;
650 } while( FOREVER ); // forever allows for escape during unit testing
655 Given a buffer and length, along with the message type, look up the fifo and write
656 the buffer. Returns 0 on error; 1 on success.
658 extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
659 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
660 fifo_t* fifo; // fifo to chalk counts on
662 int fd; // file des to write to
668 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
669 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
673 fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd
675 state = write( fd, payload, plen );
678 return state == plen;