1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: The mc listener library content. All external functions
23 should start with mcl_ and all stderr messages should have
24 (mcl) as the first token following the severity indicator.
27 Author: E. Scott Daniels
39 #include <sys/types.h>
43 #include <rmr/rmr_symtab.h>
54 Information about one file descriptor. This is pointed to by the hash
55 such that the message type can be used as a key to look up the fifo's
60 int key; // symtab key
61 long long wcount; // number of writes
62 long long drops; // number dropped
64 long long wcount_rp; // number of writes during last reporting period
65 long long drops_rp; // number dropped during last reporting period
69 Our conext. Pointers to the read and write hash tables (both keyed on the message
70 type), the message router (RMR) context, and other goodies.
73 void* mrc; // the message router's context
74 void* wr_hash; // symtable to look up pipe info based on mt for writing
75 void* rd_hash; // we support reading from pipes, but need a different FD for that
76 char* fifo_dir; // directory where we open fifos
80 // -------- private -------------------------------------------------------
84 Set up for raw data capture. We look for directory overriedes from
85 environment variables, and then invoke the rdc_init() to actually
88 static void* setup_rdc() {
90 int value; // value computed for something
91 char* ep; // pointer to environment var
92 char* sdir = "/tmp/rdc/stage"; // default directory names
93 char* fdir = "/tmp/rdc/final";
94 char* suffix = ".rdc";
97 if( (ep = getenv( "MCL_RDC_ENABLE" )) != NULL ) {
98 if( ep != NULL && atoi( ep ) == 0 ) {
99 logit( LOG_INFO, "(mcl) raw data capture disabled by environment var setting: MCL_RDC_ENABLE=%s", ep );
104 if( (ep = getenv( "MCL_RDC_STAGE" )) != NULL ) {
107 mkdir( "/tmp/rdc", 0755 ); // we ignore failures here as it could likely exist
111 if( (ep = getenv( "MCL_RDC_FINAL" )) != NULL ) {
114 mkdir( "/tmp/rdc", 0755 ); // we ignore failures again -- very likely it's there
118 if( (ep = getenv( "MCL_RDC_SUFFIX" )) != NULL ) {
122 if( (ep = getenv( "MCL_RDC_DONE" )) != NULL ) {
126 ctx = rdc_init( sdir, fdir, suffix, done );
128 logit( LOG_ERR, "rdc_init did not generate a context" );
130 logit( LOG_INFO, "raw data capture files will be staged in: %s", sdir );
131 logit( LOG_INFO, "raw data capture files will be moved for copy to: %s", fdir );
134 if( (ep = getenv( "MCL_RDC_FREQ" )) != NULL ) {
136 logit( LOG_INFO, "setting frequency: %d", value );
137 rdc_set_freq( ctx, value );
143 Builds an extended header in the buffer provided, or allocates a new buffer if
144 dest is nil. The header is of the form:
145 <delim><len><timestamp>
147 Timestamp is a single unsigned long long in ASCII; ms since epoch.
148 If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
149 the timestamp generated will be 1570113591103.
151 static char* build_hdr( int len, char* dest, int dest_len ) {
152 struct timespec ts; // time just before call executed
156 dest = (char *) malloc( sizeof( char ) * dest_len );
158 if( dest_len < 28 ) { // shouldn't happen, but take no chances
159 memset( dest, 0, dest_len );
164 memset( dest, 0, dest_len );
166 clock_gettime( CLOCK_REALTIME, &ts );
167 sprintf( dest, "%s%07d", MCL_DELIM, len );
168 sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
174 Build a file name and open. The io_direction is either READER or
175 WRITER. For a writer we must 'trick' the system into allowing us
176 to open a pipe for writing in non-blocking mode so that we can
177 report on drops (messages we couldn't write because there was no
178 reader). The trick is to open a reader on the pipe so that when
179 we open the writer there's a reader and the open won't fail. As
180 soon as we have the writer open, we can close the junk reader.
182 If the desired fifo does not exist, it is created.
184 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
186 int fd; // real file des
187 int jfd = -1; // junk file des
190 if( ctx == NULL || mtype < 0 ) {
194 snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
196 state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
197 if( state != 0 && errno != EEXIST ) {
198 logit( LOG_ERR, "(mcl) unable to create fifo: %s: %s", wbuf, strerror( errno ) );
202 if( io_dir == READER ) {
203 fd = open( wbuf, O_RDONLY ); // just open the reader
205 logit( LOG_ERR, "(mcl) fifo open failed (ro): %s: %s", wbuf, strerror( errno ) );
208 jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
210 logit( LOG_ERR, "(mcl) fifo open failed (rw): %s: %s", wbuf, strerror( errno ) );
214 fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
216 logit( LOG_ERR, "(mcl) fifo open failed (wo): %s: %s", wbuf, strerror( errno ) );
219 close( jfd ); // should be safe to close this
227 Given a message type, return the file des of the fifo that
228 the payload should be written to. Returns the file des, or -1
229 on error. When sussing out a read file descriptor this will
230 block until there is a fifo for the message type which is
233 If fref is not nil, then a pointer to the fifo info block is returned
234 allowing for direct update of counts after the write.
236 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
240 if( io_dir == READER ) { // with an integer key, we nned two hash tables
246 if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
247 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
252 memset( fifo, 0, sizeof( *fifo ) );
254 fifo->fd = open_fifo( ctx, mtype, io_dir );
255 rmr_sym_map( hash, mtype, fifo );
265 Make marking counts easier in code
267 static inline void chalk_error( fifo_t* fifo ) {
274 static inline void chalk_ok( fifo_t* fifo ) {
282 Callback function driven to traverse the symtab and generate the
283 counts for each fifo.
285 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
287 int report_period = 60;
290 report_period = *((int *) data);
293 if( (fifo = (fifo_t *) thing) != NULL ) {
294 logit( LOG_STAT, "(mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld",
295 fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
297 fifo->wcount_rp = 0; // reset the report counts
302 // ---------- public ------------------------------------------------------
304 Sets a signal handler for sigpipe so we don't crash if a reader closes the
305 last reading fd on a pipe. We could do this automatically, but if the user
306 programme needs to trap sigpipe too, this gives them the option not to have
309 extern int mcl_set_sigh( ) {
310 signal( SIGPIPE, SIG_IGN );
314 "Opens" the interface to RMR such that messages sent to the application will
315 be available via the rmr receive funcitons. This is NOT automatically called
316 by the mk_context function as some applications will be using the mc library
317 for non-RMR, fifo, chores.
319 extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) {
323 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
327 ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
328 if( ctx->mrc == NULL ) {
329 logit( LOG_CRIT, "start listening: unable to initialise RMr" );
333 while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
334 if( announce <= 0 ) {
335 logit( LOG_INFO, "waiting for RMR to show ready" );
348 Blocks until a message arives with a good return code or we timeout. Returns the
349 rmr message buffer. Timeout value epxected in seconds.
351 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
354 if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
358 if( ctx->mrc == NULL ) {
359 logit( LOG_CRIT, "get msg: abort: bad rmr context reference (nil)" );
364 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next
365 } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
373 extern void* mcl_mk_context( char* dir ) {
376 if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
377 memset( ctx, 0, sizeof( *ctx ) );
378 ctx->fifo_dir = strdup( dir );
379 ctx->wr_hash = rmr_sym_alloc( 1001 );
380 ctx->rd_hash = rmr_sym_alloc( 1001 );
382 if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
383 logit( LOG_ERR, "(mcl) unable to allocate hash table for fifo keys" );
393 Read the header. Best case we read the expected number of bytes, get all
394 of them and find that they start with the delemiter. Worst case
395 We have to wait for all of the header, or need to synch at the next
396 delimeter. We assume best case most likely and handle it as such.
398 static void read_header( int fd, char* buf ) {
400 int need = MCL_EXHDR_SIZE; // total needed
401 int dneed; // delimieter needed
403 char* rp; // read position in buf
405 len = read( fd, buf, need );
406 if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
411 if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
413 dneed = strlen( MCL_DELIM ) - len;
416 len = read( fd, rp, dneed );
422 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
423 need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
424 rp = buf + (MCL_EXHDR_SIZE - need);
427 len = read( fd, rp, need );
435 while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
436 len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
439 need = MCL_EXHDR_SIZE - len;
445 Read one record from the fifo that the message type maps to.
446 Writes at max ublen bytes into the ubuf.
448 If long_hdrs is true (!0), then we expect that the stream in the fifo
449 has extended headers (<delim><len><time>), and will write the timestamp
450 from the header into the buffer pointed to by timestamp. The buffer is
451 assumed to be at least MCL_TSTAMP_SIZE bytes in length.
453 Further, when extended headers are being used, this function will
454 automatically resynchronise if it detects an issue.
456 The function could look for the delimiter and automatically detect whether
457 or not extended headers are in use, but if the stream is out of synch on the
458 first read, this cannot be done, so the funciton requires that the caller
459 know that the FIFO contains extended headers.
461 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
465 int got = 0; // number of bytes we actually got
468 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
469 fifo_t* fref = NULL; // the fifo struct we found
471 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
476 if( (fd = suss_fifo( ctx, mtype, READER, NULL )) >= 0 ) {
478 read_header( fd, wbuf );
479 msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
481 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
484 if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
488 len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
489 msg_len = need = atoi( wbuf );
494 need = ublen; // cannot give them more than they can take
497 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
498 memcpy( ubuf+got, wbuf, len );
503 if( msg_len > got ) { // we must ditch rest of this message
504 need = msg_len = got;
506 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
519 Read one record from the fifo that the message type maps to.
520 Writes at max ublen bytes into the ubuf. If extended headers are in use
521 this function will ignore the timestamp.
523 If long_hdrs is true (!0), then we expect that the stream in the fifo
524 has extended headers (<delim><len><time>).
526 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
527 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
531 Read a single message from the FIFO returning it in the caller's buffer. If extended
532 headers are being used, and the caller supplied a timestamp buffer, the timestamp
533 which was in the header will be returned in that buffer. The return value is the number
534 of bytes in the buffer; 0 indicates an error and errno should be set.
536 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
537 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
542 Will read messages and fan them out based on the message type. This should not
543 return and if it does the caller should assume an error.
545 The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
546 string , followed by that number of 'raw' bytes. The raw bytes are the payload
549 The report parameter is the frequency, in seconds, for writing a short
550 status report to stdout. If 0 then it's off.
552 If long_hdr is true, then we geneate an extended header with a delimiter and
555 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
556 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
557 fifo_t* fifo; // fifo to chalk counts on
558 rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
559 char header[128]; // header we'll pop in front of the payload
561 int fd; // file des to write to
562 long long total = 0; // total messages received and written
563 long long total_drops = 0; // total messages received and written
564 long count = 0; // messages received and written during last reporting period
565 long errors = 0; // unsuccessful payload writes
566 long drops; // number of drops
567 time_t next_report = 0; // we'll report every 2 seconds if report is true
569 int hwlen; // write len for header
570 void* rdc_ctx = NULL; // raw data capture context
571 void* rdc_buf = NULL; // capture buffer
573 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
574 logit( LOG_ERR, "(mcl) invalid context given to fanout" );
583 rdc_ctx = setup_rdc( ); // pull rdc directories from enviornment and initialise
586 mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
588 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0 ) {
589 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
592 build_hdr( mbuf->len, header, sizeof( header ) );
593 hwlen = MCL_EXHDR_SIZE;
595 snprintf( header, sizeof( header ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
596 hwlen = MCL_LEN_SIZE;
599 if( (state = write( fd, header, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
604 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
615 if( rdc_ctx != NULL ) {
616 rdc_buf = rdc_init_buf( mbuf->mtype, header, hwlen, rdc_buf ); // set up for write
617 rdc_write( rdc_ctx, rdc_buf, mbuf->payload, mbuf->len ); // write the raw data
622 if( (now = time( NULL ) ) > next_report ) {
623 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
626 logit( LOG_STAT, "(mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors",
627 total, total_drops, report, count, drops, errors );
628 next_report = now + report;
640 Given a buffer and length, along with the message type, look up the fifo and write
641 the buffer. Returns 0 on error; 1 on success.
643 extern int mcl_fifo_one( void* vctx, char* payload, int plen, int mtype ) {
644 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
645 fifo_t* fifo; // fifo to chalk counts on
647 int fd; // file des to write to
653 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
654 logit( LOG_ERR, "(mcl) invalid context given to fifo_one\n" );
658 fd = suss_fifo( ctx, mtype, WRITER, &fifo ); // map the message type to an open fd
660 state = write( fd, payload, plen );
663 return state == plen;