From: E. Scott Daniels Date: Fri, 4 Oct 2019 13:07:42 +0000 (-0400) Subject: Initial repo population X-Git-Tag: 1.0.1~18 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=5f75c08e01fa73a8f54dd0d474c0a50b8dd23845;p=ric-app%2Fmc.git Initial repo population Signed-off-by: E. Scott Daniels Change-Id: I2652015adb5c77d0f13a75f9651e249cd0d9634d --- diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..14f0ba9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,49 @@ +# vim: ts=4 sw=4 noet: + +# ------------------------------------------------------------------------------- +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------- + +# CAUTION: This file eventually should exist in the ci directory, however until +# this can be confirmed, and the .yaml file(s) in the ci project changed +# to indicaate that ci/Dockerfile should be used, this is here with minor +# changes needed to exist at the root. +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +# CI to verify the MC application components +# Inherits C toolchain from buildpack-deps:stretch then adds cmake and better shell(s). + +# It is assumed that this docker file is used with a build command run at the +# root level of the repo (directory containing the ci directory). E.g. +# docker build -f ci/Dockerfile . + +FROM buildpack-deps:stretch + +RUN apt-get update && apt-get -q -y install cmake ksh + +# stuff our repo things into a scratch area +RUN mkdir /playpen +ADD . /playpen + + +# add any unit test scripts that need to be driven, e.g. +# RUN ksh test/mcl_unit_test.ksh + +# This is a final/only script which might print useful things to the log and ALWAYS succeeds. +RUN ksh /playpen/ci/stats.ksh + +# there is no cmd; the build/verification assumes that if the image is created +# successfully, e.g. none of the previous run commands fail, that the environment +# is successfully vetted. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e7bf3d1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,29 @@ + + Unless otherwise specified, all software contained herein is licensed + under the Apache License, Version 2.0 (the "Software License"); + you may not use this software except in compliance with the Software + License. You may obtain a copy of the Software License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the Software License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the Software License for the specific language governing permissions + and limitations under the Software License. + + + + Unless otherwise specified, all documentation contained herein is licensed + under the Creative Commons License, Attribution 4.0 Intl. (the + "Documentation License"); you may not use this documentation except in + compliance with the Documentation License. You may obtain a copy of the + Documentation License at + + https://creativecommons.org/licenses/by/4.0/ + + Unless required by applicable law or agreed to in writing, documentation + distributed under the Documentation License is distributed on an "AS IS" + BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. See the Documentation License for the specific language governing + permissions and limitations under the Documentation License. diff --git a/ci/Dockerfile b/ci/Dockerfile new file mode 100644 index 0000000..57e70a0 --- /dev/null +++ b/ci/Dockerfile @@ -0,0 +1,43 @@ +# vim: ts=4 sw=4 noet: + +# ------------------------------------------------------------------------------- +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------- + +# CI to verify the MC application components +# Inherits C toolchain from buildpack-deps:stretch then adds cmake and better shell(s). + +# It is assumed that this docker file is used with a build command run at the +# root level of the repo (directory containing the ci directory). E.g. +# docker build -f ci/Dockerfile . + +FROM buildpack-deps:stretch + +RUN apt-get update && apt-get -q -y install cmake ksh + +# stuff our repo things into a scratch area +RUN mkdir /playpen +ADD . /playpen + + +# add any unit test scripts that need to be driven, e.g. +# RUN ksh test/mcl_unit_test.ksh + +# This is a final/only script which might print useful things to the log and ALWAYS succeeds. +RUN ksh /playpen/ci/stats.ksh + +# there is no cmd; the build/verification assumes that if the image is created +# successfully, e.g. none of the previous run commands fail, that the environment +# is successfully vetted. diff --git a/ci/README b/ci/README new file mode 100644 index 0000000..abe687b --- /dev/null +++ b/ci/README @@ -0,0 +1,22 @@ + ------------------------------------------------------------------------------- + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------------------------------------------- + +These files are needed to support the continuous integration process +which verifies code on commit and might build images for the application. + +The Docker file is used to build an image during CI vetting, and maybe even +during the building of the overall application container. For vetting, +the successful build indicates that all vetting of the repo passed. diff --git a/ci/stats.ksh b/ci/stats.ksh new file mode 100644 index 0000000..563ee11 --- /dev/null +++ b/ci/stats.ksh @@ -0,0 +1,30 @@ + +# vim: ts=4 sw=4 noet: + +# ------------------------------------------------------------------------------- +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------- + +# ------------------------------------------------------------------------------- +# Mnemonic: stats.ksh +# Abstract: This script is run as the last vetting script during the CI +# process to possibly provide some stats about the verification. +# The script may be run as the only verification script during +# initial development. The script MUST always finish successfully +# Date: 24 August 2019 +# ------------------------------------------------------------------------------- + + +echo "$(date) finished verification and/or build" diff --git a/container-tag.yaml b/container-tag.yaml new file mode 100644 index 0000000..56e9ce4 --- /dev/null +++ b/container-tag.yaml @@ -0,0 +1,5 @@ + +# this provides the jenkins environment with a tag to use when +# building verification containers. +--- +tag: 1.0.0 diff --git a/src/sidecars/listener/.gitignore b/src/sidecars/listener/.gitignore new file mode 100644 index 0000000..874c63c --- /dev/null +++ b/src/sidecars/listener/.gitignore @@ -0,0 +1,2 @@ +*.o + diff --git a/src/sidecars/listener/Makefile b/src/sidecars/listener/Makefile new file mode 100644 index 0000000..36f7113 --- /dev/null +++ b/src/sidecars/listener/Makefile @@ -0,0 +1,60 @@ +# vim: ts=4 sw=4 noet: +#---------------------------------------------------------------------------------- +# +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#--------------------------------------------------------------------------------- + +# this make file assuems that both NNG and RMR are installed and that the variables +# LD_LIBRARY_PATH, LIBRARY_PATH are set correctly. + +binaries = mc_listener +test_progs = sender unit_test pipe_reader +lib_obj = mcl.o +lib_h = mcl.h + +coverage_opts = -ftest-coverage -fprofile-arcs + +# make with no parms should just build 'production' binaries +all: $(binaries) mc_listener + +libmcl.a: $(lib_obj) $(lib_h) + ar -v -r libmcl.a $< + +mc_listener: mc_listener.c libmcl.a + gcc mc_listener.c -o mc_listener -L. -lmcl -lrmr_nng -lnng -lm -lpthread + + +# ---- testing stuff ----------------------------------------------------------------- +tests: $(test_progs) + +sender : sender.c + gcc sender.c -o sender -lrmr_nng -lnng -lm -lpthread + +pipe_reader : pipe_reader.c libmcl.a + gcc pipe_reader.c -o pipe_reader -L. -lmcl -lrmr_nng -lnng -lm -lpthread + +unit_test: unit_test.c mcl.c + gcc $(coverage_opts) unit_test.c -o unit_test -lrmr_nng -lnng -lm -lpthread + + +# ---- housekeeping stuff ------------------------------------------------------------- +# remove only intermediates +clean: + rm -f *.o *.gcda *.gcno *.gcov + +# remove anything that can be rebuilt +nuke: clean + rm -f *mcl.a $(binaries) $(test_progs) diff --git a/src/sidecars/listener/README b/src/sidecars/listener/README new file mode 100644 index 0000000..4b2da47 --- /dev/null +++ b/src/sidecars/listener/README @@ -0,0 +1,77 @@ + +-------------------------------------------------------------------------------- + + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +-------------------------------------------------------------------------------- + + + +MC Listener + +This directory contains the source for the a simple message listener which +writes messages received via RMR into a fifo (named pipe) for an external +process to consume. + +Fifos are named MT_xxxxxxxx where the Xs are replaced with the message +type with up to 7 leading zeros (e.g. MT_00000002). The data written +to a pipe has the form: + <8 bytes> + +Where the first 8 bytes are the ASCII representation of the length of +the message (n) (the 8th byte is a zero allowing the bytes to be +treated as a C string if desired. The next n bytes are the unchanged +payload which was received. No RMR header information (e.g. source, +meid, etc.) is communicated. + +If the listener is executed with the timestamp extension enabled, then +the leading 'header' is enhanced such that the time the message was +received by the listener is added. Additionally, a leading delimiter +is added to make synchronisation possible. The timestamp is also an +ASCII string of the form 1570110224356 (milliseconds past the epoch). +The enhanced header has the format: + + + +The is a series of 4 bytes which should always be: '@MCL'. +It is intended to be used to sequence "frames" in the pipe should there be +write errors which result in missing data. If the application reading from +a pipe does not see this delimeter, then it should read byte by byte from +the pipe until it does in order to synchronise with the stream. + +The are as descrbed previously: 8 byte ASCII string (nil +terminated). + +The is an ASCII string (nil terminated) with a length of 16 +bytes. + +The entire header will require 28 bytes. + + +There are multiple docker files; *.df. + mcl_runime.df -- builds an image with the runtime mc_listener binary + mcl_dev.df -- builds a development image that can be used to + interactively build and test the library and mc_listener + application. + +Unit testing +A very small set of unit tests are provided for the library functions in +mcl.c. Because of the nature of the fanout function, which blocks waiting +on RMR messages, it is not possible to unit test that bit of code. + +FIFO Reader +The pipe_reader programme is a simple application which uses the mcl.c +library functions to open and read from a single pipe. If the -e option +is given it will expect that data in the FIFO has extended headers. Use +the -? option (or -h) to generate a full usage statement. diff --git a/src/sidecars/listener/build_dev_env.sh b/src/sidecars/listener/build_dev_env.sh new file mode 100755 index 0000000..d0b241a --- /dev/null +++ b/src/sidecars/listener/build_dev_env.sh @@ -0,0 +1,98 @@ +#!/usr/bin/env bash +# vim: ts=4 sw=4 noet: +#-------------------------------------------------------------------------------- +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#-------------------------------------------------------------------------------- + + + +# --------------------------------------------------------------------------- +# Mnemonic: bld_dev_env.sh +# Abstract: This script is meant to be executed by a RUN command in +# a docker file. It fetches NNG, builds/installs it, and then +# does a wget of the desired RMR version's packages and installs +# them. RMR is fetched from package cloud. +# +# Date: 22 August 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------- + +rmr_ver=1.9.0 +nng_ver=v1.1.1 + +while [[ $1 == -* ]] +do + case $1 in + -n) nng_ver=$2; shift;; + -r) rmr_ver=$2; shift;; + esac + + shift +done + +if [[ $nng_ver != "v"* ]] +then + nng_ver="v$nng_ver" +fi + +set -e # from this point on crash on any error +mkdir -p /playpen/build/nng +cd /playpen/build/nng +git clone https://github.com/nanomsg/nng.git +cd nng +git checkout $nng_ver +mkdir .build +cd .build +echo "building nng (messages supressed unless there is an error)" + +set +e +if ! cmake .. >/tmp/cmake.nng.log 2>&1 +then + cat /tmp/cmake.nng.log + echo "" + echo "### ERROR ### NNG cmake configuration failed" + exit 1 +fi + +if ! make install >/tmp/build.nng.log 2>&1 +then + cat /tmp/build.nng.log + echo "" + echo "### ERROR ### NNG build failed" + exit 1 +fi + +set -e +echo "nng build finished ok" +cd /playpen +rm -fr /playpen/build/nng + +echo "installing RMR packages version = $rmr_ver" +mkdir -p /playpen/build/pkgs +cd /playpen/build/pkgs + +base_url=https://packagecloud.io/o-ran-sc/master/packages/debian/stretch/ +base_url=https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/ +pc_url=${base_url}rmr_${rmr_ver}_amd64.deb/download.deb +pc_dev_url=${base_url}rmr-dev_${rmr_ver}_amd64.deb/download.deb + +wget -q -O rmr.deb $pc_url +wget -q -O rmr-dev.deb $pc_dev_url + +ls -al +dpkg -i *.deb +cd /playpen +rm -fr /playpen/build/pkgs +echo "RMR package install finished" diff --git a/src/sidecars/listener/build_images.sh b/src/sidecars/listener/build_images.sh new file mode 100755 index 0000000..4fe1219 --- /dev/null +++ b/src/sidecars/listener/build_images.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# vim: ts=4 sw=4 noet: + +#-------------------------------------------------------------------------------- +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#-------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------------ +# Mnemonic: build_images.sh +# Abstract: This script will create both the mc_listener runtime and development +# images. +# Date: 22 August 2-19 +# Author: E. Scott Daniels +# ----------------------------------------------------------------------------------- + +skip_dev=1 +if [[ $1 == "all" ]] +then + skip_dev=0 + shift +fi + +if [[ $1 == "-?" || $1 == "-h" ]] +then + echo "usage: $0 [all] [mcl-version-tag [patch-level]]" + echo " using all as first keyword causes both runtime and dev images to build" + exit 0 +fi + + +ver=${1:-1.1} +patch=${2:-0} + +if (( skip_dev == 0 )) +then + echo "building development image" + docker build -f mcl_dev.df -t mcl_dev:$ver.$patch . +fi + +echo "building runtime image mc_listener:$ver" +if docker build -f mcl_runtime.df -t mc_listener:$ver.$patch . +then + echo "build finished" + echo "" + docker images|grep mc_ +fi + diff --git a/src/sidecars/listener/mc_listener.c b/src/sidecars/listener/mc_listener.c new file mode 100644 index 0000000..6b8da45 --- /dev/null +++ b/src/sidecars/listener/mc_listener.c @@ -0,0 +1,149 @@ +// vim: ts=4 sw=4 noet: +/* +-------------------------------------------------------------------------------- + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +-------------------------------------------------------------------------------- +*/ + +/* + Mnemonic: mc_listener.c + Abstract: This application (management campaign listener) will listen for + RMR based messages and write the payloads into FIFOs which + correspond to the message type. + + Defaults: + /var/lib/mc/listener -- directory for FIFOs + + Command line options: + -d FIFO directory (default is /tmp/mcl/fifos) + -p The port to set RMR listener on (default is 4560) + -r The frequency that count reports are written to + stderr. 0 == 0ff; default is 60. + + + RMR based environment variables which might be needed: + RMR_SEED_RT -- path to the static routing table + RMR_RTG_SVC -- port to listen for RTG connections + + Date: 22 August 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include + + +#include "mcl.h" + +//---- support ----------------------------------------------------------------------------- + +static void bad_arg( char* what ) { + fprintf( stderr, "[ERR] option is unrecognised or isn't followed by meaningful data: %s\n", what ); +} + +static void usage( char* argv0 ) { + fprintf( stderr, "usage: %s [-d fifo-dir] [-e] [-p listen-port] [-q | -r report-freq]\n", argv0 ); + fprintf( stderr, " -e enable extended header in buffers written to FIFOs\n" ); +} + +//------------------------------------------------------------------------------------------ +int main( int argc, char** argv ) { + void* ctx; // the mc listener library context + char* dname = "/tmp/mcl/fifos"; // default directory where we open fifos + char* port = "4560"; // default rmr port + char* siphon_dir = "/tmp/mci/siphon"; // where siphon files are placed + int siphon = 0; // percentage of messages to siphone off + int report_freq = 60; // report stats every n seconds + int pidx = 1; // parameter index + int error = 0; + int long_hdrs = 0; // -e sets and causes extended headers to be written + + while( pidx < argc && argv[pidx][0] == '-' ) { // simple argument parsing (-x or -x value) + switch( argv[pidx][1] ) { + case 'd': + if( pidx+1 < argc ) { + dname = strdup( argv[pidx+1] ); + pidx++; + } else { + bad_arg( argv[pidx] ); + error = 1; + } + break; + + case 'e': + long_hdrs = 1; + break; + + case 'p': + if( pidx+1 < argc ) { + port = strdup( argv[pidx+1] ); + pidx++; + } else { + bad_arg( argv[pidx] ); + error = 1; + } + break; + + case 'q': + report_freq = 0; + break; + + case 'r': + if( pidx+1 < argc ) { + report_freq = atoi( argv[pidx+1] ); + pidx++; + } else { + bad_arg( argv[pidx] ); + error = 1; + } + break; + + case 'h': + case '?': + usage( argv[0] ); + exit( 0 ); + break; + + default: + bad_arg( argv[pidx] ); + error = 1; + break; + } + + pidx++; + } + + if( error ) { + usage( argv[0] ); + exit( 1 ); + } + + ctx = mcl_mk_context( dname ); // initialise the library context + if( ctx == NULL ) { + fprintf( stderr, "[FAIL] couldn't initialise the mc listener environment\n" ); + exit( 1 ); + } + mcl_set_sigh(); // set signal handler(s) + + mcl_start_listening( ctx, port, MCL_NOWAIT ); // start the listener, no waiting for rt since we don't send + mcl_fifo_fanout( ctx, report_freq, long_hdrs ); // listen and fanout messages to fifo; report to stdout every ~2sec + + fprintf( stderr, "[INFO] mc listener is finished.\n" ); +} + diff --git a/src/sidecars/listener/mcl.c b/src/sidecars/listener/mcl.c new file mode 100644 index 0000000..06ceb82 --- /dev/null +++ b/src/sidecars/listener/mcl.c @@ -0,0 +1,568 @@ +// vim: ts=4 sw=4 noet: +/* +-------------------------------------------------------------------------------- + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +-------------------------------------------------------------------------------- +*/ + +/* + Mnemonic: mcl.c. + Abstract: The mc listener library content. All external functions + should start with mcl_ and all stderr messages should have + (mcl) as the first token following the severity indicator. + + Date: 22 August 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "mcl.h" + +#define READER 0 +#define WRITER 1 + +#define TRUE 1 +#define FALSE 0 + + +/* + Information about one file descriptor. This is pointed to by the hash + such that the message type can be used as a key to look up the fifo's + file descriptor. +*/ +typedef struct { + int fd; // open fdes + int key; // symtab key + long long wcount; // number of writes + long long drops; // number dropped + + long long wcount_rp; // number of writes during last reporting period + long long drops_rp; // number dropped during last reporting period +} fifo_t; + +/* + Our conext. Pointers to the read and write hash tables (both keyed on the message + type), the message router (RMR) context, and other goodies. +*/ +typedef struct { + void* mrc; // the message router's context + void* wr_hash; // symtable to look up pipe info based on mt for writing + void* rd_hash; // we support reading from pipes, but need a different FD for that + char* fifo_dir; // directory where we open fifos + +} mcl_ctx_t; + +// -------- private ------------------------------------------------------- + + +/* + Builds an extended header in the buffer provided, or allocates a new buffer if + dest is nil. The header is of the form: + + + Timestamp is a single unsigned long long in ASCII; ms since epoch. + If the current time is 2019/10/03 10:39:51.103 which is 1570113591.103 + the timestamp generated will be 1570113591103. +*/ +static char* build_hdr( int len, char* dest, int dest_len ) { + struct timespec ts; // time just before call executed + + if( dest == NULL ) { + dest_len = 48; + dest = (char *) malloc( sizeof( char ) * dest_len ); + } else { + if( dest_len < 28 ) { // shouldn't happen, but take no chances + memset( dest, 0, dest_len ); + return NULL; + } + } + + memset( dest, 0, dest_len ); + + clock_gettime( CLOCK_REALTIME, &ts ); + sprintf( dest, "%s%07d", MCL_DELIM, len ); + sprintf( dest+12, "%ld%03ld", ts.tv_sec, ts.tv_nsec/1000000 ); + + return dest; +} + +/* + Build a file name and open. The io_direction is either READER or + WRITER. For a writer we must 'trick' the system into allowing us + to open a pipe for writing in non-blocking mode so that we can + report on drops (messages we couldn't write because there was no + reader). The trick is to open a reader on the pipe so that when + we open the writer there's a reader and the open won't fail. As + soon as we have the writer open, we can close the junk reader. + + If the desired fifo does not exist, it is created. +*/ +static int open_fifo( mcl_ctx_t* ctx, int mtype, int io_dir ) { + char wbuf[1024]; + int fd; // real file des + int jfd = -1; // junk file des + int state; + + if( ctx == NULL || mtype < 0 ) { + return -1; + } + + snprintf( wbuf, sizeof( wbuf ), "%s/MT_%09d", ctx->fifo_dir, mtype ); + + state = mkfifo( wbuf, 0660 ); // make the fifo; this will fail if it exists and that's ok + if( state != 0 && errno != EEXIST ) { + fprintf( stderr, "[ERR] (mcl) unable to create fifo: %s: %s\n", wbuf, strerror( errno ) ); + return -1; + } + + if( io_dir == READER ) { + fd = open( wbuf, O_RDONLY ); // just open the reader + if( fd < 0 ) { + fprintf( stderr, "[ERR] (mcl) fifo open failed (ro): %s: %s\n", wbuf, strerror( errno ) ); + } + } else { + jfd = open( wbuf, O_RDWR | O_NONBLOCK ); // must have a reader before we can open a non-blocking writer + if( jfd < 0 ) { + fprintf( stderr, "[ERR] (mcl) fifo open failed (rw): %s: %s\n", wbuf, strerror( errno ) ); + } + + fd = open( wbuf, O_WRONLY | O_NONBLOCK ); // this will be our actual writer, in non-blocking mode + if( fd < 0 ) { + fprintf( stderr, "[ERR] (mcl) fifo open failed (wo): %s: %s\n", wbuf, strerror( errno ) ); + } + + close( jfd ); // should be safe to close this + } + + + return fd; +} + +/* + Given a message type, return the file des of the fifo that + the payload should be written to. Returns the file des, or -1 + on error. When sussing out a read file descriptor this will + block until there is a fifo for the message type which is + open for reading. + + If fref is not nil, then a pointer to the fifo info block is returned + allowing for direct update of counts after the write. +*/ +static int suss_fifo( mcl_ctx_t* ctx, int mtype, int io_dir, fifo_t** fref ) { + fifo_t* fifo; + void* hash; + + if( io_dir == READER ) { // with an integer key, we nned two hash tables + hash = ctx->rd_hash; + } else { + hash = ctx->wr_hash; + } + + if( (fifo = (fifo_t *) rmr_sym_pull( hash, mtype )) == NULL ) { + fifo = (fifo_t *) malloc( sizeof( *fifo ) ); + if( fifo == NULL ) { + return -1; + } + + memset( fifo, 0, sizeof( *fifo ) ); + fifo->key = mtype; + fifo->fd = open_fifo( ctx, mtype, io_dir ); + rmr_sym_map( hash, mtype, fifo ); + } + + if( fref != NULL ) { + *fref = fifo; + } + return fifo->fd; +} + +/* + Make marking counts easier in code +*/ +static inline void chalk_error( fifo_t* fifo ) { + if( fifo != NULL ) { + fifo->drops++; + fifo->drops_rp++; + } +} + +static inline void chalk_ok( fifo_t* fifo ) { + if( fifo != NULL ) { + fifo->wcount++; + fifo->wcount_rp++; + } +} + +/* + Callback function driven to traverse the symtab and generate the + counts for each fifo. +*/ +static void wr_stats( void* st, void* entry, char const* name, void* thing, void* data ) { + fifo_t* fifo; + int report_period = 60; + + if( data ) { + report_period = *((int *) data); + } + + if( (fifo = (fifo_t *) thing) != NULL ) { + fprintf( stdout, "[STAT] (mcl) mtype=%d total writes=%lld total drops=%lld; during last %ds writes=%lld drops=%lld\n", + fifo->key, fifo->wcount, fifo->drops, report_period, fifo->wcount_rp, fifo->drops_rp ); + + fifo->wcount_rp = 0; // reset the report counts + fifo->drops_rp = 0; + } +} + +// ---------- public ------------------------------------------------------ +/* + Sets a signal handler for sigpipe so we don't crash if a reader closes the + last reading fd on a pipe. We could do this automatically, but if the user + programme needs to trap sigpipe too, this gives them the option not to have + us interfere. +*/ +extern int mcl_set_sigh( ) { + signal( SIGPIPE, SIG_IGN ); +} + +/* + "Opens" the interface to RMR such that messages sent to the application will + be available via the rmr receive funcitons. This is NOT automatically called + by the mk_context function as some applications will be using the mc library + for non-RMR, fifo, chores. +*/ +extern int mcl_start_listening( void* vctx, char* port, int wait4ready ) { + mcl_ctx_t* ctx; + int announce = 0; + + if( (ctx = (mcl_ctx_t*) vctx) == NULL ) { + return 0; + } + + ctx->mrc = rmr_init( port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! + if( ctx->mrc == NULL ) { + fprintf( stderr, "[CRIT] unable to initialise RMr\n" ); + return 0; + } + + while( wait4ready && ! rmr_ready( ctx->mrc ) ) { // only senders need to wait + if( announce <= 0 ) { + fprintf( stderr, "[INFO] waiting for RMR to show ready\n" ); + announce = 10; + } else { + announce--; + } + + sleep( 1 ); + } + + return 1; +} + +/* + Blocks until a message arives with a good return code or we timeout. Returns the + rmr message buffer. Timeout value epxected in seconds. +*/ +extern rmr_mbuf_t* mcl_get_msg( void* vctx, rmr_mbuf_t* msg, int timeout ) { + mcl_ctx_t* ctx; + + if( (ctx = (mcl_ctx_t *) vctx) == NULL ) { + return NULL; + } + + if( ctx->mrc == NULL ) { + fprintf( stderr, "bad context\n" ); + exit( 1 ); + } + + do { + msg = rmr_torcv_msg( ctx->mrc, msg, timeout * 1000 ); // wait for next + } while( msg == NULL || (msg->state != RMR_OK && msg->state != RMR_ERR_TIMEOUT) ); + + return msg; +} + +/* + Create the context. +*/ +extern void* mcl_mk_context( char* dir ) { + mcl_ctx_t* ctx; + + if( (ctx = (mcl_ctx_t *) malloc( sizeof( *ctx ) )) != NULL ) { + memset( ctx, 0, sizeof( *ctx ) ); + ctx->fifo_dir = strdup( dir ); + ctx->wr_hash = rmr_sym_alloc( 1001 ); + ctx->rd_hash = rmr_sym_alloc( 1001 ); + + if( ctx->wr_hash == NULL || ctx->rd_hash == NULL ) { + fprintf( stderr, "[ERR] (mcl) unable to allocate hash table for fifo keys\n" ); + free( ctx ); + return NULL; + } + } + + return (void *) ctx; +} + +/* + Read the header. Best case we read the expected number of bytes, get all + of them and find that they start with the delemiter. Worst case + We have to wait for all of the header, or need to synch at the next + delimeter. We assume best case most likely and handle it as such. +*/ +static void read_header( int fd, char* buf ) { + int len; + int need = MCL_EXHDR_SIZE; // total needed + int dneed; // delimieter needed + int rlen; + char* rp; // read position in buf + + len = read( fd, buf, need ); + if( len == need && strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // best case, most likely + return; + } + + while( TRUE ) { + if( len < strlen( MCL_DELIM ) ) { // must get at least enough bytes to check delim + rp = buf + len; + dneed = strlen( MCL_DELIM ) - len; + + while( dneed > 0 ) { + len = read( fd, rp, dneed ); + dneed -= len; + rp += len; + } + } + + if( strncmp( buf, MCL_DELIM, strlen( MCL_DELIM )) == 0 ) { // have a good delimiter, just need to wait for bytes + need = MCL_EXHDR_SIZE - strlen( MCL_DELIM ); + rp = buf + (MCL_EXHDR_SIZE - need); + + while( need > 0 ) { + len = read( fd, rp, need ); + need -= len; + rp += len; + } + + return; + } + + while( buf[0] != MCL_DELIM[0] ) { // wait for a recognised start byte to be read (may cause an additional message drop + len = read( fd, buf, 1 ); // because we ignore start byte that might be in the buffer) + } + + need = MCL_EXHDR_SIZE - len; + } +} + + +/* + Read one record from the fifo that the message type maps to. + Writes at max ublen bytes into the ubuf. + + If long_hdrs is true (!0), then we expect that the stream in the fifo + has extended headers (