From: Agustin F. Pozuelo Date: Mon, 31 May 2021 15:28:47 +0000 (+0100) Subject: Feed VIAVI data into E2 Simulator via pipe X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=sim%2Fe2-interface.git;a=commitdiff_plain;h=88de94233a1b3b09cc91e3b557c825ac1a80dacb Feed VIAVI data into E2 Simulator via pipe Signed-off-by: Agustin F. Pozuelo Change-Id: Ia3e1373764a1118d88876df9b9254fbb5c31d084 --- diff --git a/e2sim/docker/Dockerfile b/e2sim/docker/Dockerfile index 3721230..2325892 100644 --- a/e2sim/docker/Dockerfile +++ b/e2sim/docker/Dockerfile @@ -14,7 +14,8 @@ # limitations under the License. #================================================================================== -FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as buildenv +ARG CONTAINER_PULL_REGISTRY=nexus3.o-ran-sc.org:10001 +FROM ${CONTAINER_PULL_REGISTRY}/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as buildenv RUN mkdir /playpen RUN apt-get update && apt-get install -y build-essential git cmake libsctp-dev autoconf automake libtool bison flex libboost-all-dev diff --git a/e2sim/e2sm_examples/kpm_e2sm/Dockerfile b/e2sim/e2sm_examples/kpm_e2sm/Dockerfile index 709d6e5..9735353 100644 --- a/e2sim/e2sm_examples/kpm_e2sm/Dockerfile +++ b/e2sim/e2sm_examples/kpm_e2sm/Dockerfile @@ -31,7 +31,8 @@ # the builder has: git, wget, cmake, gcc/g++, make, python2/3. v7 dropped nng support # -FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:8-u18.04 as buildenv +ARG CONTAINER_PULL_REGISTRY=nexus3.o-ran-sc.org:10001 +FROM ${CONTAINER_PULL_REGISTRY}/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as buildenv # spaces to save things in the build image to copy to final image RUN mkdir -p /playpen/assets /playpen/src /playpen/bin diff --git a/e2sim/e2sm_examples/kpm_e2sm/src/kpm/kpm_callbacks.cpp b/e2sim/e2sm_examples/kpm_e2sm/src/kpm/kpm_callbacks.cpp index 18081f3..26958b6 100644 --- a/e2sim/e2sm_examples/kpm_e2sm/src/kpm/kpm_callbacks.cpp +++ b/e2sim/e2sm_examples/kpm_e2sm/src/kpm/kpm_callbacks.cpp @@ -50,6 +50,8 @@ extern "C" { #include #include +#include "viavi_connector.hpp" + using json = nlohmann::json; using namespace std; @@ -146,12 +148,19 @@ void get_cell_id(uint8_t *nrcellid_buf, char *cid_return_buf) { } -void run_report_loop(long requestorId, long instanceId, long ranFunctionId, long actionId) { +void run_report_loop(long requestorId, long instanceId, long ranFunctionId, long actionId) +{ + std::filebuf reports_json; + std::streambuf *input_filebuf = &reports_json; - std::ifstream input("/playpen/src/reports.json"); - bool x = input.good(); + std::unique_ptr viavi_connector; + if (!reports_json.open("/playpen/src/reports.json", std::ios::in)) { + std::cerr << "Can't open reports.json, enabling VIAVI connector instead..." << endl; + viavi_connector.reset(new viavi::RICTesterReceiver {3001, nullptr}); + input_filebuf = viavi_connector->get_data_filebuf(); + } - fprintf(stderr, "%s\n", x ? "true" : "false"); + std::istream input {input_filebuf}; long seqNum = 1; diff --git a/e2sim/e2sm_examples/kpm_e2sm/src/kpm/viavi_connector.hpp b/e2sim/e2sm_examples/kpm_e2sm/src/kpm/viavi_connector.hpp new file mode 100644 index 0000000..8c152b0 --- /dev/null +++ b/e2sim/e2sm_examples/kpm_e2sm/src/kpm/viavi_connector.hpp @@ -0,0 +1,200 @@ +// VIAVI RIC Tester streaming data input and closed loop +// +// Maintainer: agustin.pozuelo@viavisolutions.com +// +// Copyright 2020 VIAVI Solutions +// +// Licensed under the Apache License, Version 2.0 (the "License") +// you may not use this file except in compliance with the License +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + + +namespace viavi { + +using namespace std::chrono_literals; +using Clock = std::chrono::steady_clock; +using json = nlohmann::json; + +void panic(const char* why) +{ + throw std::runtime_error(std::string("RIC tester: ") + why + + ". ERRNO " + std::to_string(errno) + ": " + ::strerror(errno)); +} + +static int sample_data_callback(json& message) +{ + std::stringstream ss; ss << "RIC tester thread " << std::this_thread::get_id(); + auto I = ss.str(); + int report_count = 0; + if (message.contains("ueMeasReport")) { + json reports = message["ueMeasReport"]["ueMeasReportList"]; + + message["ueMeasReport"].erase("ueMeasReportList"); + std::clog << I << " got " << reports.size() << " UE reports: " << message["ueMeasReport"]; + + for (auto& report: reports) { + ++report_count; + int serving_cell = report["nrCellIdentity"]; + int rsrp = report["servingCellRfReport"]["rsrp"]; + } + } + if (message.contains("cellMeasReport")) { + json reports = message["cellMeasReport"]["cellMeasReportList"]; + + message["cellMeasReport"].erase("cellMeasReportList"); + std::clog << I << " got " << reports.size() << " cell reports: " << message["cellMeasReport"]; + + for (auto& report: reports) { + ++report_count; + int serving_cell = report["nrCellIdentity"]; + int throughput = report["throughput"]; + } + } + return report_count; +} + +class RICTesterReceiver +{ + public: + RICTesterReceiver(int portno = 3001, std::function data_callback = sample_data_callback) + { + listen_fd = socket(AF_INET, SOCK_STREAM, 0); + int enabled = 1; + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) < 0) + panic("ERROR opening socket"); + struct sockaddr_in serv_addr = { + .sin_family = AF_INET, + .sin_port = htons(portno), + .sin_addr = { .s_addr = INADDR_ANY} + }; + if (bind(listen_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) + panic("Bind error"); + + printf("Listening on TCP 0.0.0.0:%d...\n", portno); + listen(listen_fd, 3); // Max 3 pending connections + + set_callback(data_callback); + threads_.emplace_back([this] {server();}); + } + + void set_callback(std::function data_callback) + { + data_callback_ = data_callback; + if (!data_callback && data_pipe_[0] == STDIN_FILENO) { + printf("No data callback provided, creating data pipe...\n"); + if (pipe(data_pipe_)) + panic("Cannot create data pipe"); + data_filebuf_ = __gnu_cxx::stdio_filebuf {data_pipe_[0], std::ios::in}; + } + } + + std::basic_filebuf* get_data_filebuf() + { + return &data_filebuf_; + } + + ~RICTesterReceiver() + { + alive = false; + for (auto& t: threads_) + t.join(); + } + + protected: + int listen_fd; + std::vector threads_; + bool alive = true; + std::function data_callback_; + int data_pipe_[2] = {STDIN_FILENO, STDOUT_FILENO}; // Default to stdin, stdout + __gnu_cxx::stdio_filebuf data_filebuf_; + + void server() + { + while (alive) + { + int connection_fd = accept(listen_fd, NULL, NULL); + if (connection_fd <= 0) + panic("ERROR on accept"); + threads_.emplace_back([this, connection_fd] {service(connection_fd);}); + } + } + + void service(int connection_fd) + { + std::stringstream ss; ss << "RIC tester thread " << std::this_thread::get_id(); + auto I = ss.str(); + std::clog << I << " starting connection on fd " << connection_fd << std::endl; + int report_count = 0; + auto log_time = Clock::now(); + try { + __gnu_cxx::stdio_filebuf + inbuf {connection_fd, std::ios::in}, + outbuf{data_pipe_[1], std::ios::out}; + std::istream input {&inbuf}; + std::ostream output{&outbuf}; + + while (alive && input.good()) + { + if (!data_callback_) + { + // Pipe raw data and let user decode JSON + char buf[4096]; + input.get(buf[0]); // Block until 1 char is ready + output.write(buf, 1 + input.readsome(buf+1, sizeof(buf)-1)); + } + else try + { + // Decode JSON message and pass it to user's callback + json message; + input >> message; + std::clog << I << " Message type: " << message.begin().key() << std::endl; + report_count += data_callback_(message); + } catch (const std::exception& ex) { + std::clog << I << " ERROR in CALLBACK: " << ex.what() << std::endl; + } + + if (Clock::now() > log_time) { + std::clog << I << " got " << report_count << " reports" << std::endl; + log_time += 10s; + } + } + } catch (const std::exception& ex) { + std::clog << I << " ERROR: " << ex.what() << std::endl; + } + std::clog << I << " closed connection after " << report_count << " reports" << std::endl; + } + +}; + +}