From 28b0c411053c155cb849cd269acb71ec701d79ac Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Mon, 17 Jun 2019 10:09:14 -0400 Subject: [PATCH] enhance(test): Add route generation simulator The simulator provids the means to generate one or more route tables which are delivered through the RMr route table collector thread. Signed-off-by: E. Scott Daniels Change-Id: I95d73ad5a0dc28c380bed2b9cb314a8b646429d8 Signed-off-by: E. Scott Daniels --- test/rtg_sim/BUILD | 38 +++ test/rtg_sim/CMakeLists.txt | 77 +++++ test/rtg_sim/README | 36 ++ test/rtg_sim/req_resp.c | 352 +++++++++++++++++++ test/rtg_sim/req_resp.h | 60 ++++ test/rtg_sim/rtm_sim.c | 803 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 1366 insertions(+) create mode 100644 test/rtg_sim/BUILD create mode 100644 test/rtg_sim/CMakeLists.txt create mode 100644 test/rtg_sim/README create mode 100644 test/rtg_sim/req_resp.c create mode 100644 test/rtg_sim/req_resp.h create mode 100644 test/rtg_sim/rtm_sim.c diff --git a/test/rtg_sim/BUILD b/test/rtg_sim/BUILD new file mode 100644 index 0000000..0f2feee --- /dev/null +++ b/test/rtg_sim/BUILD @@ -0,0 +1,38 @@ +# +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +To build the rtm_sim application, the expected CMake "process" is followed +and described below: + + mkdir .build + cd .build + cmake .. + make package + + +This will generate at least a .deb, and if the underlying system can support +it an RPM package as well. + + +Once installed, the rtm_sim application can be started from the command line: + rtm_sim config-file-name + +The config file defines the table(s) which are to be constructed and distributed +to applications. + diff --git a/test/rtg_sim/CMakeLists.txt b/test/rtg_sim/CMakeLists.txt new file mode 100644 index 0000000..6753dd3 --- /dev/null +++ b/test/rtg_sim/CMakeLists.txt @@ -0,0 +1,77 @@ + +# +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# simulate the route table manager +project( rtm_sim LANGUAGES C ) +cmake_minimum_required( VERSION 3.5 ) + +set( major_version "1" ) +set( minor_version "0" ) +set( patch_level "1" ) + +set( CMAKE_POSITION_INDEPENDENT_CODE ON ) +set( CMAKE_CXX_FLAGS "-g -Wall -I /usr/local/include" ) + +find_library( nanomsg libnanomsg.a ) + +find_program( rpm NAMES rpmbuild ) # rpm package gen requires this to be installed + +if( "${rpm}" MATCHES "rpm-NOTFOUND" ) # cannot build rpm + set( pkg_list "DEB" ) + message( "+++ `make package` will generate only deb package; cannot find support to generate rpm packages" ) +else() + set( pkg_list "DEB;RPM" ) + message( "+++ `make package` will generate both deb and rpm packages" ) +endif() + +add_executable( rtm_sim rtm_sim.c ) +target_link_libraries( rtm_sim nanomsg ) + +install( + TARGETS rtm_sim + DESTINATION /usr/local/bin +) + +IF( EXISTS "${CMAKE_ROOT}/Modules/CPack.cmake" ) + include( InstallRequiredSystemLibraries ) + + set( CPACK_set_DESTDIR "on" ) + set( CPACK_PACKAGING_INSTALL_PREFIX "${install_root}" ) + set( CPACK_GENERATOR ${pkg_list} ) + + set( CPACK_PACKAGE_DESCRIPTION "Simplistic route table manager simulator." ) + set( CPACK_PACKAGE_DESCRIPTION_SUMMARY "RT manager simulation" ) + set( CPACK_PACKAGE_VENDOR "None" ) + set( CPACK_PACKAGE_CONTACT "None" ) + set( CPACK_PACKAGE_VERSION_MAJOR "${major_version}" ) + set( CPACK_PACKAGE_VERSION_MINOR "${minor_version}" ) + set( CPACK_PACKAGE_VERSION_PATCH "${patch_level}" ) + set( CPACK_PACKAGE_FILE_NAME "rtm_sim-${major_version}.${minor_version}.${patch_level}-${CMAKE_SYSTEM_PROCESSOR}" ) + + # we build and ship the libraries, so there is NO dependency + set( CPACK_DEBIAN_PACKAGE_DEPENDS "libnanomsg0 (>=0.4)" ) + + set( CPACK_DEBIAN_PACKAGE_PRIORITY "optional" ) + set( CPACK_DEBIAN_PACKAGE_SECTION "test" ) + set( CPACK_DEBIAN_ARCHITECTURE ${CMAKE_SYSTEM_PROCESSOR} ) + set( CPACK_RPM_ARCHITECTURE ${CMAKE_SYSTEM_PROCESSOR} ) + + INCLUDE( CPack ) +ENDIF() diff --git a/test/rtg_sim/README b/test/rtg_sim/README new file mode 100644 index 0000000..2ab2b7c --- /dev/null +++ b/test/rtg_sim/README @@ -0,0 +1,36 @@ +# +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +The rtm_sim is an extremely simple application which reads a configuration +file, builds one or more route tables, and sends the table(s) to the +indicated applications. The applications are assumed to be RMr based +applications which are listening either on a well known port (4561) for +table updates, or on a port which is indicated in the config file. + +The sim is _not_ intended to be a complete replacement for any route +table mangager, but simply allows the ability to: + + 1) ensure that the RMr based application can receive real-time + table updates (thus is configured correctly with the needed + port properly exposed via the container environment if + applicable). + + 2) allow the generation of one or more route tables from a central + point during testing without having to run a more complicated + route manager for basic testing. diff --git a/test/rtg_sim/req_resp.c b/test/rtg_sim/req_resp.c new file mode 100644 index 0000000..9d58472 --- /dev/null +++ b/test/rtg_sim/req_resp.c @@ -0,0 +1,352 @@ +// :vi ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: req_rep.c + Abstract: A "library" module which allows a programme to easily be a requestor + or replier. Some functions are compatable with publishing (mbuf + allocation and management). Underlying we use the NN_PAIR and NOT + the req/rep model as that model is an inflexible, lock step, exchange + which does not lend well for a request that results in more than one + response messages, or no response. + + The user must be aware that once a session is established on the + host:port listener, another session will not be accepted until the + first is terminated; nano makes no provision for multiple concurrent + sesssions with either the PAIR or REQ/RESP models. + + We also support starting the publisher socket as the buffer and + send functions can be used for the publisher too. + + CAUTION: this is based on nanomsg, not NNG. The underlying protocols + are compatable, and because NNG has an emulation mode it is possible + to link successsfully with the nng library, BUT that will not + work here. Link only with nanomsg. + + Date: 18 January 2018 + Author: E. Scott Daniels + +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "req_resp.h" + +#define NULL_SOCKET 0 // fluff that is treated like a nil pointer check by coverage checker + + +/* + Connect to the host as a requestor. returns context if + successful. +*/ +extern void* rr_connect( char* host, char* port ) { + rr_ctx_t* ctx = NULL; + char wbuf[1024]; + int state; + + if( host == NULL || port == NULL ) { + errno = EINVAL; + return NULL; + } + + ctx = (rr_ctx_t *) malloc( sizeof *ctx ); + if( ctx == NULL ) { + errno = ENOMEM; + return NULL; + } + + //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR ); + ctx->nn_sock = nn_socket( AF_SP, NN_PUSH ); + if( ctx->nn_sock < NULL_SOCKET ) { + free( ctx ); + return NULL; + } + snprintf( wbuf, sizeof( wbuf ), "tcp://%s:%s", host, port ); + state = nn_connect( ctx->nn_sock, wbuf ); + if( state < 0 ) { + fprintf( stderr, "rr_conn: connect failed: %s: %d %s\n", wbuf, errno, strerror( errno ) ); + nn_close( ctx->nn_sock ); + free( ctx ); + return NULL; + } + + //fprintf( stderr, "rr_conn: connect successful: %s\n", wbuf ); + return (void *) ctx; +} + + +/* + Set up as a listener on any interface with the given port. +*/ +extern void* rr_start_listening( char* port ) { + rr_ctx_t* ctx; + char wbuf[1024]; + int state; + + if( port == NULL ) { + errno = EINVAL; + return NULL; + } + + ctx = (rr_ctx_t *) malloc( sizeof *ctx ); + if( ctx == NULL ) { + errno = EINVAL; + return NULL; + } + + //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR ); + ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); + if( ctx->nn_sock < NULL_SOCKET ) { + free( ctx ); + return NULL; + } + + snprintf( wbuf, sizeof( wbuf ), "tcp://0.0.0.0:%s", port ); + state = nn_bind( ctx->nn_sock, wbuf ); + if( state < 0 ) { + nn_close( ctx->nn_sock ); + free( ctx ); + return NULL; + } + + return (void *) ctx; +} + +/* + Configure and bind the publisher. Port is a string as it's probably read from + the command line, so no need to atoi() it for us. We can use the rr_* functions + for message buffers and sending, so we reuse their context rather than define our + own. + +*/ +extern void* open_publisher( char* port ) { + rr_ctx_t* pctx; + char conn_info[1024]; + + if( (pctx = (rr_ctx_t *) malloc( sizeof( *pctx )) ) == NULL ) { + return NULL; + } + + pctx->nn_sock = nn_socket( AF_SP, NN_PUB ); // publishing socket + if( pctx->nn_sock < 0 ) { + fprintf( stderr, "[CRI] unable to open publish socket: %s\n", strerror( errno ) ); + free( pctx ); + return NULL; + } + + snprintf( conn_info, sizeof( conn_info ), "tcp://0.0.0.0:%s", port ); // listen on any interface + if( nn_bind( pctx->nn_sock, conn_info ) < 0) { // bind and automatically accept client sessions + fprintf (stderr, "[CRI] unable to bind publising port: %s: %s\n", port, strerror( errno ) ); + nn_close ( pctx->nn_sock ); + free( pctx ); + return NULL; + } + + return (void *) pctx; +} + +extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ) { + + if( ! mb ) { + mb = (rr_mbuf_t *) malloc( sizeof( *mb ) ); + mb->size = len; + mb->payload = NULL; + } else { + if( mb->size < len ) { // if requested len is larger than current payload + nn_freemsg( mb->payload ); + mb->payload = NULL; + } else { + len = mb->size; + } + } + mb->used = 0; + + if( len > 0 && !mb->payload ) { // allow a payloadless buffer to be allocated + mb->payload = nn_allocmsg( len, 0 ); + } + + return mb; +} + +/* + Closes the currently open session. +*/ +extern void rr_close( void* vctx ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + return; + } + + if( ctx->nn_sock < NULL_SOCKET ) { + return; + } + + nn_close( ctx->nn_sock ); + ctx->nn_sock = -1; +} + +extern void rr_free( void* vctx ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + return; + } + + rr_close( ctx ); + nn_term(); + free( ctx ); +} + +extern void rr_free_mbuf( rr_mbuf_t* mbuf ) { + if( mbuf->payload ) { + nn_freemsg( mbuf->payload ); + mbuf->payload = NULL; + mbuf->used = -2; // just in case they held a pointer and try to use it + } + + free( mbuf ); +} + +extern rr_mbuf_t* rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return NULL; + } + if( ctx->nn_sock < 0 ) { + errno = ESTALE; // stale/bad socket fd + return NULL; + } + + mbuf = rr_new_buffer( mbuf, len ); + if( mbuf == NULL ) { + return NULL; + } + + *mbuf->payload = 0; + if( (mbuf->used = nn_recv( ctx->nn_sock, mbuf->payload, mbuf->size, 0 )) > 0 ) { + errno = 0; // nano doesn't seem to clear errno here + } + return mbuf; +} + +extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ) { + rr_ctx_t* ctx; + int len; + int state; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return NULL; + } + + if( ctx->nn_sock < 0 ) { + errno = ESTALE; // stale/bad socket fd + return NULL; + } + + if( ! mbuf ) { + errno = ENOBUFS; // not quite right, but close enough + return NULL; + } + + if( ! mbuf->payload ) { // no payload???? + errno = EFAULT; // nil is a bad address after all :) + return mbuf; + } + + errno = 0; + //fprintf( stderr, "rrsend is sending %d bytes....\n", mbuf->used ); + if( (state = nn_send( ctx->nn_sock, &mbuf->payload, NN_MSG, 0 )) > 0 ) { + //fprintf( stderr, "send ok to %d: %d %s\n", ctx->nn_sock, state, strerror( errno ) ); + mbuf->used = 0; + if( alloc_buf ) { + mbuf->payload = nn_allocmsg( mbuf->size, 0 ); // allocate the next send buffer + } else { + mbuf->payload = NULL; + mbuf->used = -1; + } + + errno = 0; + } else { + fprintf( stderr, "send failed %d %s\n", state, strerror( errno ) ); + } + + return mbuf; +} + +/* + Set the receive timeout to time. If time >100 we assume the time is milliseconds, + else we assume seconds. Setting -1 is always block. + Returns the nn value (0 on success <0 on error). +*/ +extern int rr_rcv_to( void* vctx, int time ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return -1; + } + + if( time > 0 ) { + if( time < 100 ) { + time = time * 1000; // assume seconds, nn wants ms + } + } + + return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) ); +} + +/* + Set the send timeout to time. If time >100 we assume the time is milliseconds, + else we assume seconds. Setting -1 is always block. + Returns the nn value (0 on success <0 on error). +*/ +extern int rr_send_to( void* vctx, int time ) { + rr_ctx_t* ctx; + + if( (ctx = (rr_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return -1; + } + + if( time > 0 ) { + if( time < 100 ) { + time = time * 1000; // assume seconds, nn wants ms + } + } + + return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) ); +} + diff --git a/test/rtg_sim/req_resp.h b/test/rtg_sim/req_resp.h new file mode 100644 index 0000000..0473c8b --- /dev/null +++ b/test/rtg_sim/req_resp.h @@ -0,0 +1,60 @@ +// :vi ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +#ifndef _req_resp_h +#define _req_resp_h + +/* + A message buffer that can be used for either req/resp context setup + or for publishing in a pub/sub setup. +*/ +typedef struct rr_mbuf { + int used; // bytes actually used + int size; // allocated size of payload + char* payload; +} rr_mbuf_t; + + +/* + A 'context' to interface with nano; so far very simple, but could + expand, so we use the struct. +*/ +typedef struct rr_ctx { + int nn_sock; +} rr_ctx_t; + + +// ---- prototypes for the rr library ---------------------- +// vctx is the pointer returned by the connect or start listening functions +// and is passed to nearly all other functions. + +extern void* rr_connect( char* host, char* port ); +extern void* rr_start_listening( char* port ); +extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ); +extern void rr_close( void* vctx ); +extern void rr_free( void* vctx ); +extern void rr_free_mbuf( rr_mbuf_t* mbuf ); +extern void* open_publisher( char* port ); +extern int rr_rcv_to( void* vctx, int time ); +extern int rr_send_to( void* vctx, int time ); +extern rr_mbuf_t* rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ); +extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ); + +#endif diff --git a/test/rtg_sim/rtm_sim.c b/test/rtg_sim/rtm_sim.c new file mode 100644 index 0000000..c52bb2d --- /dev/null +++ b/test/rtg_sim/rtm_sim.c @@ -0,0 +1,803 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: rtm_sim.c + Abstract: This is a simple route manager simulation which provides the ability + to push a route table into one or more xAPPs. Designed just to + drive the internal RMr route table collector outside of the static + file allowing for testing of port definition in some containerised + environments. + + This application does not persist; it generates a set of tables based + on the config file, connects to all applications listed, distributes + the table, and exits. If periodic delivery of one or more different + configurations needs to be executed, use a shell script to wrap this + application in a loop. + + Date: 14 June 2019 + Author: E. Scott Daniels +*/ + +/* +config file format: + # comment and blank lines allowed + # trailing comments allowed + + # port is used for any app listed in send2 which does not have a trailing :port + # it may be supplied as a different value before each table, and if not + # redefined applies to all subsequent tables. + # + + # A table consists of a send2 list (app[:port]) which are the applications that will + # receive the table. Each table may contain one or more entries. Entries define + # the message type and subscription ID, along with one or more round robin groups. + # A rrgroup is one or more app:port "endpoints" which RMr will use when sending + # messages of the indicated type/subid. Port on a rrgroup is rquired and is the + # port that the application uses for app to app communications. + # + + port: xapp-rtg-listen-port # 4561 default + table: + send2: app1:port app2:port ... appn:port + entry: + mtype: n + subid: n + rrgroup: app:port... app:port + + entry: + mtype: n + subid: n + rrgroup: app:port ... app:port +*/ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "req_resp.c" // simple nano interface for request/response connections + + +#define CONNECTED 1 // we've established a shunt connection to the app + +#define TRUE 1 +#define FALSE 0 + +#define ALLOC_NEW 1 // rrsend should allocate a new buffer + +#define MAX_TABLES 16 // total tables we support +#define MAX_SEND2 64 // max number of apps that a table can be sent to +#define MAX_APPS 1024 // max total apps defined (tables * send2) +#define MAX_GROUPS 64 // max num of round robin groups per entry +#define MAX_RRG_SIZE 64 // max number of apps in a group +#define MAX_ENTRIES 256 // max entries in a table +#define MAX_TOKENS 512 // max tokens we'll break a buffer into + + +// --------------------------------------------------------------------------------------- + +/* + Things we need to track for an application. +*/ +typedef struct app { + void* shunt; // the rr context to shunt directly to an app + int state; // connected or not + char* name; // IP address or DNS name and port for connecting + char* port; // rr wants two strings as it builds it's own NN string +} app_t; + + + +// ----- table stuff (very staticly sized, but this isn't for prod) ---------------------- +/* + A round robin group in a table entry +*/ +typedef struct rrgroup { + int napps; // number of apps + char* apps[MAX_RRG_SIZE]; +} rrgroup_t; + +/* + A single table entry. +*/ +typedef struct entry { + int mtype; // entry message type + int subid; // entry sub id + int ngroups; + rrgroup_t groups[MAX_GROUPS]; // the entry's groups +} entry_t; + +/* + Defines a table which we will distribute. +*/ +typedef struct table { + int napps; // number of apps this table is sent to + int first_app; // first app in minfo that we send to + int nentries; // number of entries + entry_t entries[MAX_ENTRIES]; +} table_t; + +/* + Master set of contextual information. +*/ +typedef struct master { + int napps; // number in use (next insert point) + int ntables; + int port; // the port applications open by default for our connections + app_t apps[MAX_APPS]; + table_t tables[MAX_TABLES]; +} master_t; + +/* + Record buffer; file in memory which can be iterated over a record at a time. +*/ +typedef struct rbuffer { + char* buffer; // stuff read from file + char* rec; // next record + int at_end; // true if end was reached +} rbuffer_t; + +/* + Set of tokens. +*/ +typedef struct tokens { + char* buffer; // buffer that tokens points into + int ntoks; // number of tokens in tokens + char* tokens[MAX_TOKENS]; // pointers into buffer at the start of each token 0..ntokens-1 +} tokens_t; + + +// ----- token utilities ------------------------------------------------------------ + +/* + Frees a token manager. +*/ +static void free_tokens( tokens_t* t ) { + if( t == NULL ) { + return; + } + + free( t ); +} + +/* + Simple tokeniser. If sep is whitespace, then leading whitespace (all, not just + sep) is ignored; if sep is not whitespace, then leadign whitespace is included + in the first token. If sep is whitespace, consecutive instances of whitespace + are treated as a single seperator: + if sep given as space, then + "bug boo" and "bug boo" both generate two tokens: (bug) (boo) + + if sep given as pipe (|), then + "bug||boo" generates three tokens: (bug), (), (boo) + + Each token is a zero terminated string. + +*/ +static tokens_t* tokenise( char* buf, char sep ) { + tokens_t* t; + int i; + char end_sep; // if quoted endsep will be the quote mark + + if( !buf || !(*buf) ) { + return NULL; + } + + t = (tokens_t *) malloc( sizeof( *t ) ); + memset( t, 0, sizeof( *t ) ); + + t->buffer = strdup( buf ); + buf = t->buffer; // convenience + + if( isspace( sep ) ) { // if sep is in whitespace class + while( buf != NULL && *buf && isspace( *buf ) ) { // pass over any leading whitespace + buf++; + } + + for( i = 0; i < strlen( buf ); i++ ) { + if( buf[i] == '\t' ) { + buf[i] = ' '; + } + } + } + + while( buf != NULL && t->ntoks < MAX_TOKENS && *buf ) { + if( *buf == '"' ) { + end_sep = '"'; + buf++; + + } else { + end_sep = sep; + } + + t->tokens[t->ntoks++] = buf; // capture token start + + if( (buf = strchr( buf, end_sep )) != NULL ) { // find token end + *(buf++) = 0; + + if( end_sep != sep ) { + buf++; + } + + if( isspace( sep ) ) { // treat consec seperators as one if sep is whitespace + while( *buf == sep ) { + buf++; + } + } + } + } + + return t; +} + +// ----- file/record management utilities ------------------------------------------------------------ + +/* + Read an entire file into a single buffer. +*/ +static char* f2b( char* fname ) { + struct stat stats; + off_t fsize = 8192; // size of the file + off_t nread; // number of bytes read + int fd; + char* buf; // input buffer + + if( (fd = open( fname, O_RDONLY )) >= 0 ) { + if( fstat( fd, &stats ) >= 0 ) { + if( stats.st_size <= 0 ) { // empty file + close( fd ); + fd = -1; + } else { + fsize = stats.st_size; // stat ok, save the file size + } + } else { + fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k + } + } + + if( fd < 0 ) { // didn't open or empty + if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) { + return NULL; + } + + *buf = 0; + return buf; + } + + if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string + close( fd ); + errno = ENOMEM; + return NULL; + } + + nread = read( fd, buf, fsize ); + if( nread < 0 || nread > fsize ) { // failure of some kind + free( buf ); + errno = EFBIG; // likely too much to handle + close( fd ); + return NULL; + } + + buf[nread] = 0; + + close( fd ); + return buf; +} + +/* + Read a file into a buffer, and set a record buffer to manage it. +*/ +static rbuffer_t* f2r( char* fname ) { + char* raw; // raw buffer + rbuffer_t* r; + + if( (raw = f2b( fname )) == NULL ) { + return NULL; + } + + r = (rbuffer_t *) malloc( sizeof( *r ) ); + memset( r, 0, sizeof( *r ) ); + r->buffer = raw; + r->rec = raw; // point at first (only) record +} + +/* + Return a pointer to the next record in the buffer, or nil if at + end of buffer. +*/ +static char* next_rec( rbuffer_t* r ) { + char* rec; + + if( !r || r->at_end ) { + return NULL; + } + + rec = r->rec; + r->rec = strchr( r->rec, '\n' ); + if( r->rec ) { + *r->rec = 0; + r->rec++; + if( *r->rec == 0 ) { + r->at_end = TRUE; + } + } else { + r->at_end = TRUE; // mark for next call + } + + return rec; +} + +static void free_rbuf( rbuffer_t* r ) { + if( r != NULL ) { + if( r->buffer != NULL ) { + free( r->buffer ); + } + + free( r ); + } +} + + +// ----- table management ------------------------------------------------------------ + +/* + Run the app list and attempt to open a shunt to any unconnected application. + Returns the number of applications that could not be connected to. +*/ +static int connect2all( master_t* mi ) { + int errors = 0; + int i; + + + if( mi == NULL ) { + return 1; + } + + for( i = 0; i < mi->napps; i++ ) { + if( mi->apps[i].state != CONNECTED ) { + fprintf( stderr, "[INF] opening shunt to: %s:%s\n", mi->apps[i].name, mi->apps[i].port ); + mi->apps[i].shunt = rr_connect( mi->apps[i].name, mi->apps[i].port ); + if( mi->apps[i].shunt == NULL) { + errors++; + } else { + fprintf( stderr, "[INFO] shunt created to: %s:%s\n", mi->apps[i].name, mi->apps[i].port ); + mi->apps[i].state = CONNECTED; + } + } + } + + return errors; +} + +/* + Add an application to the current table. +*/ +static void add_app( master_t* mi, char* app_name ) { + char wbuf[256]; + char* app_port; + char* ch; + + if( mi == NULL || app_name == NULL ) { + return; + } + + if( mi->napps > MAX_APPS ) { + fprintf( stderr, "[WARN] too many applications, ignoring: %s\n", app_name ); + } + + if( (ch = strchr( app_name, ':' )) == NULL ) { // assume we are using the default rm listen port + snprintf( wbuf, sizeof( wbuf ), "%d", mi->port ); + app_port = wbuf; + } else { + *(ch++) = 0; // name port given, split and point at port + app_port = ch; + } + + mi->apps[mi->napps].name = strdup( app_name ); + mi->apps[mi->napps].port = strdup( app_port ); + mi->apps[mi->napps].state = !CONNECTED; + mi->napps++; +} + + +/* + Initialise things; returns a master info context. +*/ +static master_t* init( char* cfname ) { + master_t* mi; + char wbuf[128]; + rbuffer_t* rb; // record manager for reading config file + char* rec; + tokens_t* tokens; + int i; + int errors; + char* tok; + table_t* table = NULL; + rrgroup_t* rrg = NULL; + entry_t* entry = NULL; + int rec_num = 0; + + mi = (master_t *) malloc( sizeof( *mi ) ); + if( mi == NULL ) { + return NULL; + } + memset( mi, 0, sizeof( *mi ) ); + mi->port = 4561; + + rb = f2r( cfname ); // get a record buffer to parse the config file + if( rb == NULL ) { + fprintf( stderr, "[FAIL] unable to open config file: %s: %s\n", cfname, strerror( errno ) ); + free( mi ); + return NULL; + } + + while( (rec = next_rec( rb )) != NULL ) { + if( *rec ) { + rec_num++; + tokens = tokenise( rec, ' ' ); + + fprintf( stderr, "parsing %d: %s\n", rec_num, rec ); + + for( i = 0; i < tokens->ntoks && *tokens->tokens[i] != '#'; i++ ); // simple comment strip + tokens->ntoks = i; + + if( tokens->ntoks > 0 ) { + tok = tokens->tokens[0]; + switch( *tok ) { // faster jump table based on 1st ch; strcmp only if needed later + case '#': + break; + + case 'e': + if( table != NULL && table->nentries < MAX_ENTRIES ) { + entry = &table->entries[table->nentries++]; + entry->subid = -1; // no subscription id if user omits + } else { + fprintf( stderr, "[ERR] @%d no table started, or table full\n", rec_num ); + } + break; + + case 'm': + if( entry != NULL ) { + entry->mtype = atoi( tokens->tokens[1] ); + } else { + fprintf( stderr, "[ERR] @%d no entry started\n", rec_num ); + } + break; + + case 'p': + if( tokens->ntoks > 1 ) { + mi->port = atoi( tokens->tokens[1] ); + if( mi->port < 1000 ) { + fprintf( stderr, "[WRN] @%d assigned default xAPP port smells fishy: %s\n", rec_num, tokens->tokens[1] ); + } + } + break; + + case 'r': // round robin group + if( entry != NULL && entry->ngroups < MAX_GROUPS ) { + if( tokens->ntoks < MAX_RRG_SIZE ) { + rrg = &entry->groups[entry->ngroups++]; + + for( i = 1; i < tokens->ntoks; i++ ) { + rrg->apps[rrg->napps++] = strdup( tokens->tokens[i] ); + } + } else { + fprintf( stderr, "[ERR] @%d round robin group too big.\n", rec_num ); + } + } else { + fprintf( stderr, "[ERR] @%d no previous entry, or entry is full\n", rec_num ); + } + break; + + case 's': + if( *(tok+1) == 'e' ) { // send2 + if( table != NULL && tokens->ntoks < MAX_SEND2 ) { + table->first_app = mi->napps; + + for( i = 1; i < tokens->ntoks; i++ ) { + add_app( mi, tokens->tokens[i] ); + } + + table->napps = tokens->ntoks - 1; + } + } else { // subid + if( entry != NULL ) { + entry->subid = atoi( tokens->tokens[1] ); + } else { + fprintf( stderr, "[ERR] @%d no entry started\n", rec_num ); + } + } + break; + + case 't': + entry = NULL; + if( mi->ntables < MAX_TABLES ) { + table = &mi->tables[mi->ntables++]; + } else { + fprintf( stderr, "[ERR] @%d too many tables defined\n", rec_num ); + table = NULL; + } + break; + + default: + fprintf( stderr, "record from config was ignored: %s\n", rec ); + break; + } + } + } + } + + free_rbuf( rb ); + return mi; +} + + +/* + Build a buffer with the entry n from table t. Both entry and table + numbers are 0 based. + Caller must free returned buffer. +*/ +static char* mk_entry( master_t* mi, int table, int entry ) { + char wbuf[4096]; + char sbuf[256]; + table_t* tab; + entry_t* ent; + int i; + int j; + int len; + int alen; + + if( !mi || mi->ntables <= table ) { + return NULL; + } + + tab = &mi->tables[table]; + if( tab->nentries <= entry ) { + return NULL; + } + + ent = &tab->entries[entry]; + + snprintf( wbuf, sizeof( wbuf ), "mse | %d | %d | ", ent->mtype, ent->subid ); // we only generate mse records + + len = strlen( wbuf ); + for( i = 0; i < ent->ngroups; i++ ) { + if( i ) { + strcat( wbuf, "; " ); + } + + for( j = 0; j < ent->groups[i].napps; j++ ) { + alen = strlen( ent->groups[i].apps[j] ) + 3; // not percise, but close enough for testing + if( alen + len > sizeof( wbuf ) ) { + fprintf( stderr, "[ERR] entry %d for table %d is too large to format\n", entry, table ); + return NULL; + } + + if( j ) { + strcat( wbuf, "," ); + } + + strcat( wbuf, ent->groups[i].apps[j] ); + } + } + + strcat( wbuf, "\n" ); + return strdup( wbuf ); +} + +/* + Sends a buffer to all apps in the range. +*/ +void send2range( master_t* mi, char* buf, int first, int n2send ) { + int a; // application offset in master array + int last; // stopping point (index) + rr_mbuf_t* mbuf = NULL; + + if( !mi ) { // safe to dance + return; + } + + a = first; + last = first + n2send; + + mbuf = rr_new_buffer( mbuf, strlen( buf ) + 5 ); // ensure buffer is large enough + while( a < last ) { + fprintf( stderr, "%s ", mi->apps[a].name ); + + memcpy( mbuf->payload, buf, strlen( buf ) ); + mbuf->used = strlen( buf ); + mbuf = rr_send( mi->apps[a].shunt, mbuf, ALLOC_NEW ); + + a++; + + fprintf( stderr, "\n" ); + } + + rr_free_mbuf( mbuf ); +} + +/* + Formats the entries for the table and sends to all applications that the + table should be sent to per the send2 directive in the config. +*/ +static void send_table( master_t* mi, int table ) { + table_t* tab; + char* ent; // entry string to send + int e = 0; // entry number + int a; + int last; + rr_mbuf_t* mbuf = NULL; + + if( !mi || table > mi->ntables ) { + return; + } + + tab = &mi->tables[table]; + fprintf( stderr, "[INF] send table start message: " ); + send2range( mi, "newrt | start\n", tab->first_app, tab->napps ); // send table start req to all + + while( (ent = mk_entry( mi, table, e++ )) != NULL ) { // build each entry once, send to all + fprintf( stderr, "[INF] sending table %d entry %d: ", table, e ); + send2range( mi, ent, tab->first_app, tab->napps ); + } + + fprintf( stderr, "[INF] send table end message: " ); + send2range( mi, "newrt | end\n", tab->first_app, tab->napps ); // send table end notice to all +} + +/* + Run the list of tables and send them all out. +*/ +static void send_all_tables( master_t* mi ) { + int i; + + if( ! mi ) { + return; + } + + for( i = 0; i < mi->ntables; i++ ) { + send_table( mi, i ); + } +} + + +// ----------------- testing ---------------------------------------------------------------------- + +/* + Dump table entries in the form we'll send to apps to stderr. +*/ +static void print_tables( master_t* mi ) { + char* ent; + int t; + int e; + + if( ! mi ) { + return; + } + + for( t = 0; t < mi->ntables; t++ ) { + fprintf( stderr, "=== table %d ===\n", t ); + e = 0; + while( (ent = mk_entry( mi, t, e++ )) != NULL ) { + fprintf( stderr, "%s", ent ); + free( ent ); + } + } +} + +static void print_tokens( tokens_t* tokens ) { + int i; + + fprintf( stderr, "there are %d tokens\n", tokens->ntoks ); + for( i = 0; i < tokens->ntoks; i++ ) { + fprintf( stderr, "[%02d] (%s)\n", i, tokens->tokens[i] ); + } +} + +static void self_test( char* fname ) { + char* s0 = " Now is the time for all to stand up and cheer!"; + char* s1 = " Now is \"the time for all\" to stand up and cheer!"; + char* s2 = " field1 | field2||field4|field5"; + tokens_t* tokens; + rbuffer_t *rb; + int i; + + tokens = tokenise( s0, ' ' ); + if( tokens->ntoks != 11 ) { + fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 ); + } + print_tokens( tokens ); + + tokens = tokenise( s1, ' ' ); + if( tokens->ntoks != 8 ) { + fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 ); + } + print_tokens( tokens ); + + tokens = tokenise( s2, '|' ); + if( tokens->ntoks != 5 ) { + fprintf( stderr, "didn't parse into 5 tokens (got %d): %s\n", tokens->ntoks, s2 ); + } + print_tokens( tokens ); + + free_tokens( tokens ); + + if( fname == NULL ) { + return; + } + + rb = f2r( fname ); + if( rb == NULL ) { + fprintf( stderr, "[FAIL] couldn't read file into rbuffer: %s\n", strerror( errno ) ); + } else { + while( (s2 = next_rec( rb )) != NULL ) { + fprintf( stderr, "record: (%s)\n", s2 ); + } + + free_rbuf( rb ); + } + + return; +} + + +// ---------------------------------------------------------------------------------------------- + +int main( int argc, char** argv ) { + void* mi; + int not_ready; + + if( argc > 1 ) { + if( strcmp( argv[1], "selftest" ) == 0 ) { + self_test( argc > 1 ? argv[2] : NULL ); + exit( 0 ); + } + + mi = init( argv[1] ); // parse the config and generate table structs + if( ! mi ) { + fprintf( stderr, "[CRI] initialisation failed\n" ); + exit( 1 ); + } + + print_tables( mi ); + + while( (not_ready = connect2all( mi ) ) > 0 ) { + fprintf( stderr, "[INF] still waiting to connect to %d applications\n", not_ready ); + sleep( 2 ); + } + + fprintf( stderr, "[INF] connected to all applications, sending tables\n" ); + + send_all_tables( mi ); + } else { + fprintf( stderr, "[INFO] usage: %s file-name\n", argv[0] ); + } +} + -- 2.16.6