1 // vim: ts=4 sw=4 noet:
3 --------------------------------------------------------------------------------
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 --------------------------------------------------------------------------------
22 Abstract: The mc listener library content. All external functions
23 should start with mcl_ and all stderr messages should have
24 (mcl) as the first token following the severity indicator.
27 Author: E. Scott Daniels
41 #include <rmr/rmr_symtab.h>
53 Information about one file descriptor. This is pointed to by the hash
54 such that the message type can be used as a key to look up the fifo's
59 int key; // symtab key
60 long long wcount; // number of writes
61 long long drops; // number dropped
63 long long wcount_rp; // number of writes during last reporting period
64 long long drops_rp; // number dropped during last reporting period
68 Our conext. Pointers to the read and write hash tables (both keyed on the message
69 type), the message router (RMR) context, and other goodies.
72 void* mrc; // the message router's context
73 void* wr_hash; // symtable to look up pipe info based on mt for writing
74 void* rd_hash; // we support reading from pipes, but need a different FD for that
75 char* fifo_dir; // directory where we open fifos
79 // -------- private -------------------------------------------------------
83 Builds an extended header in the buffer provided, or allocates a new buffer if
84 dest is nil. The header is of the form:
85 <delim><len><timestamp>
87 Timestamp is a single unsigned long long in ASCII; ms since epoch.
88 If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103
89 the timestamp generated will be 1570113591103.
91 static char* build_hdr( int len, char* dest, int dest_len ) {
92 struct timespec ts; // time just before call executed
96 dest = (char *) malloc( sizeof( char ) * dest_len );
98 if( dest_len < 28 ) { // shouldn't happen, but take no chances
99 memset( dest, 0, dest_len );
104 memset( dest, 0, dest_len );
106 clock_gettime( CLOCK_REALTIME, &ts );
107 sprintf( dest, "%s%07d", MCL_DELIM, len );
108 sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 );
114 Build a file name and open. The io_direction is either READER or
115 WRITER. For a writer we must 'trick' the system into allowing us
116 to open a pipe for writing in non-blocking mode so that we can
117 report on drops (messages we couldn't write because there was no
118 reader). The trick is to open a reader on the pipe so that when
119 we open the writer there's a reader and the open won't fail. As
120 soon as we have the writer open, we can close the junk reader.
122 If the desired fifo does not exist, it is created.
124 static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) {
126 int fd; // real file des
127 int jfd = -1; // junk file des
130 if( ctx == NULL || mtype < 0 ) {
134 snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype );
136 state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok
137 if( state != 0 && errno != EEXIST ) {
138 fprintf( stderr, "[ERR] (mcl) unable to create fifo: %s: %s\n", wbuf, strerror( errno ) );
142 if( io_dir == READER ) {
143 fd = open( wbuf, O_RDONLY ); // just open the reader
145 fprintf( stderr, "[ERR] (mcl) fifo open failed (ro): %s: %s\n", wbuf, strerror( errno ) );
148 jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer
150 fprintf( stderr, "[ERR] (mcl) fifo open failed (rw): %s: %s\n", wbuf, strerror( errno ) );
153 fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode
155 fprintf( stderr, "[ERR] (mcl) fifo open failed (wo): %s: %s\n", wbuf, strerror( errno ) );
158 close( jfd ); // should be safe to close this
166 Given a message type, return the file des of the fifo that
167 the payload should be written to. Returns the file des, or -1
168 on error. When sussing out a read file descriptor this will
169 block until there is a fifo for the message type which is
172 If fref is not nil, then a pointer to the fifo info block is returned
173 allowing for direct update of counts after the write.
175 static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) {
179 if( io_dir == READER ) { // with an integer key, we nned two hash tables
185 if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) {
186 fifo = (fifo_t *) malloc( sizeof( *fifo ) );
191 memset( fifo, 0, sizeof( *fifo ) );
193 fifo->fd = open_fifo( ctx, mtype, io_dir );
194 rmr_sym_map( hash, mtype, fifo );
204 Make marking counts easier in code
206 static inline void chalk_error( fifo_t* fifo ) {
213 static inline void chalk_ok( fifo_t* fifo ) {
221 Callback function driven to traverse the symtab and generate the
222 counts for each fifo.
224 static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) {
226 int report_period = 60;
229 report_period = *((int *) data);
232 if( (fifo = (fifo_t *) thing) != NULL ) {
233 fprintf( stdout, "[STAT] (mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld\n",
234 fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp );
236 fifo->wcount_rp = 0; // reset the report counts
241 // ---------- public ------------------------------------------------------
243 Sets a signal handler for sigpipe so we don't crash if a reader closes the
244 last reading fd on a pipe. We could do this automatically, but if the user
245 programme needs to trap sigpipe too, this gives them the option not to have
248 extern int mcl_set_sigh( ) {
249 signal( SIGPIPE, SIG_IGN );
253 "Opens" the interface to RMR such that messages sent to the application will
254 be available via the rmr receive funcitons. This is NOT automatically called
255 by the mk_context function as some applications will be using the mc library
256 for non-RMR, fifo, chores.
258 extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) {
262 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
266 ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
267 if( ctx->mrc == NULL ) {
268 fprintf( stderr, "[CRIT] unable to initialise RMr\n" );
272 while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait
273 if( announce <= 0 ) {
274 fprintf( stderr, "[INFO] waiting for RMR to show ready\n" );
287 Blocks until a message arives with a good return code or we timeout. Returns the
288 rmr message buffer. Timeout value epxected in seconds.
290 extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) {
293 if( (ctx = (mcl_ctx_t *) vctx) == NULL ) {
297 if( ctx->mrc == NULL ) {
298 fprintf( stderr, "bad context\n" );
303 msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next
304 } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) );
312 extern void* mcl_mk_context( char* dir ) {
315 if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) {
316 memset( ctx, 0, sizeof( *ctx ) );
317 ctx->fifo_dir = strdup( dir );
318 ctx->wr_hash = rmr_sym_alloc( 1001 );
319 ctx->rd_hash = rmr_sym_alloc( 1001 );
321 if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) {
322 fprintf( stderr, "[ERR] (mcl) unable to allocate hash table for fifo keys\n" );
332 Read the header. Best case we read the expected number of bytes, get all
333 of them and find that they start with the delemiter. Worst case
334 We have to wait for all of the header, or need to synch at the next
335 delimeter. We assume best case most likely and handle it as such.
337 static void read_header( int fd, char* buf ) {
339 int need = MCL_EXHDR_SIZE; // total needed
340 int dneed; // delimieter needed
342 char* rp; // read position in buf
344 len = read( fd, buf, need );
345 if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely
350 if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim
352 dneed = strlen( MCL_DELIM ) - len;
355 len = read( fd, rp, dneed );
361 if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes
362 need = MCL_EXHDR_SIZE - strlen( MCL_DELIM );
363 rp = buf + (MCL_EXHDR_SIZE - need);
366 len = read( fd, rp, need );
374 while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop
375 len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer)
378 need = MCL_EXHDR_SIZE - len;
384 Read one record from the fifo that the message type maps to.
385 Writes at max ublen bytes into the ubuf.
387 If long_hdrs is true (!0), then we expect that the stream in the fifo
388 has extended headers (<delim><len><time>), and will write the timestamp
389 from the header into the buffer pointed to by timestamp. The buffer is
390 assumed to be at least MCL_TSTAMP_SIZE bytes in length.
392 Further, when extended headers are being used, this function will
393 automatically resynchronise if it detects an issue.
395 The function could look for the delimiter and automatically detect whether
396 or not extended headers are in use, but if the stream is out of synch on the
397 first read, this cannot be done, so the funciton requires that the caller
398 know that the FIFO contains extended headers.
400 static int fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char* timestamp ) {
404 int got = 0; // number of bytes we actually got
407 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
408 fifo_t* fref = NULL; // the fifo struct we found
410 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
415 if( (fd = suss_fifo( ctx, mtype, READER, NULL )) >= 0 ) {
417 read_header( fd, wbuf );
418 msg_len = need = atoi( wbuf + MCL_LEN_OFF ); // read the length
420 strcpy( timestamp, wbuf + MCL_TSTAMP_OFF+1 );
423 if( timestamp != NULL ) { // won't be there, but ensure it's not garbage
427 len = read( fd, wbuf, MCL_LEN_SIZE ); // we assume we will get all 8 as there isn't a way to sync the old stream
428 msg_len = need = atoi( wbuf );
433 need = ublen; // cannot give them more than they can take
436 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
437 memcpy( ubuf+got, wbuf, len );
442 if( msg_len > got ) { // we must ditch rest of this message
443 need = msg_len = got;
445 len = read( fd, wbuf, need > sizeof( wbuf ) ? sizeof( wbuf ) : need );
458 Read one record from the fifo that the message type maps to.
459 Writes at max ublen bytes into the ubuf. If extended headers are in use
460 this function will ignore the timestamp.
462 If long_hdrs is true (!0), then we expect that the stream in the fifo
463 has extended headers (<delim><len><time>).
465 extern int mcl_fifo_read1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs ) {
466 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, NULL );
470 Read a single message from the FIFO returning it in the caller's buffer. If extended
471 headers are being used, and the caller supplied a timestamp buffer, the timestamp
472 which was in the header will be returned in that buffer. The return value is the number
473 of bytes in the buffer; 0 indicates an error and errno should be set.
475 extern int mcl_fifo_tsread1( void *vctx, int mtype, char* ubuf, int ublen, int long_hdrs, char *timestamp ) {
476 return fifo_read1( vctx, mtype, ubuf, ublen, long_hdrs, timestamp );
481 Will read messages and fan them out based on the message type. This should not
482 return and if it does the caller should assume an error.
484 The output to each fifo is MCL_LEN_SIZE bytes with an ASCII, zero terminated, length
485 string , followed by that number of 'raw' bytes. The raw bytes are the payload
488 The report parameter is the frequency, in seconds, for writing a short
489 status report to stdout. If 0 then it's off.
491 If long_hdr is true, then we geneate an extended header with a delimiter and
494 extern void mcl_fifo_fanout( void* vctx, int report, int long_hdr ) {
495 mcl_ctx_t* ctx; // our context; mostly for the rmr context reference and symtable
496 fifo_t* fifo; // fifo to chalk counts on
497 rmr_mbuf_t* mbuf = NULL; // received message buffer; recycled on each call
498 char wbuf[128]; // buffer to build len string in
500 int fd; // file des to write to
501 long long total = 0; // total messages received and written
502 long long total_drops = 0; // total messages received and written
503 long count = 0; // messages received and written during last reporting period
504 long errors = 0; // unsuccessful payload writes
505 long drops; // number of drops
506 time_t next_report = 0; // we'll report every 2 seconds if report is true
508 int hwlen; // write len for header
510 if( (ctx = (mcl_ctx_t*) vctx) == NULL ) {
511 fprintf( stderr, "[ERR] (mcl) invalid context given to fanout\n" );
521 mbuf = mcl_get_msg( ctx, mbuf, report ); // wait up to report sec for msg (0 == block until message)
523 if( mbuf != NULL && mbuf->state == RMR_OK && mbuf->len > 0 ) {
524 fd = suss_fifo( ctx, mbuf->mtype, WRITER, &fifo ); // map the message type to an open fd
527 build_hdr( mbuf->len, wbuf, sizeof( wbuf ) );
528 hwlen = MCL_EXHDR_SIZE;
530 snprintf( wbuf, sizeof( wbuf ), "%07d", mbuf->len ); // size of payload CAUTION: 7d is MCL_LEN_SIZE-1
531 hwlen = MCL_LEN_SIZE;
534 if( (state = write( fd, wbuf, hwlen )) != hwlen ) { // write exactly MCL_LEN_SIZE bytes from the buffer
539 if( write( fd, mbuf->payload, mbuf->len ) < mbuf->len ) { // followed by the payload
552 if( (now = time( NULL ) ) > next_report ) {
553 rmr_sym_foreach_class( ctx->wr_hash, 0, wr_stats, &report ); // run endpoints in the active table
556 fprintf( stdout, "[STAT] (mcl) total writes=%lld total drops=%lld; during last %ds writes=%ld drops=%ld errs=%ld errors\n",
557 total, total_drops, report, count, drops, errors );
558 next_report = now + report;