1 // VIAVI RIC Tester streaming data input and closed loop
3 // Maintainer: agustin.pozuelo@viavisolutions.com
5 // Copyright 2020 VIAVI Solutions
7 // Licensed under the Apache License, Version 2.0 (the "License")
8 // you may not use this file except in compliance with the License
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
16 // See the License for the specific language governing permissions and
17 // limitations under the License
22 #include <ext/stdio_filebuf.h>
29 #include <unordered_map>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <sys/socket.h>
40 #include <nlohmann/json.hpp>
45 using namespace std::chrono_literals;
46 using Clock = std::chrono::steady_clock;
47 using json = nlohmann::json;
49 void panic(const char* why)
51 throw std::runtime_error(std::string("RIC tester: ") + why
52 + ". ERRNO " + std::to_string(errno) + ": " + ::strerror(errno));
55 static int sample_data_callback(json& message)
57 std::stringstream ss; ss << "RIC tester thread " << std::this_thread::get_id();
60 if (message.contains("ueMeasReport")) {
61 json reports = message["ueMeasReport"]["ueMeasReportList"];
63 message["ueMeasReport"].erase("ueMeasReportList");
64 std::clog << I << " got " << reports.size() << " UE reports: " << message["ueMeasReport"];
66 for (auto& report: reports) {
68 int serving_cell = report["nrCellIdentity"];
69 int rsrp = report["servingCellRfReport"]["rsrp"];
72 if (message.contains("cellMeasReport")) {
73 json reports = message["cellMeasReport"]["cellMeasReportList"];
75 message["cellMeasReport"].erase("cellMeasReportList");
76 std::clog << I << " got " << reports.size() << " cell reports: " << message["cellMeasReport"];
78 for (auto& report: reports) {
80 int serving_cell = report["nrCellIdentity"];
81 int throughput = report["throughput"];
87 class RICTesterReceiver
90 RICTesterReceiver(int portno = 3001, std::function<int(json& message)> data_callback = sample_data_callback)
92 listen_fd = socket(AF_INET, SOCK_STREAM, 0);
94 if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) < 0)
95 panic("ERROR opening socket");
96 struct sockaddr_in serv_addr = {
97 .sin_family = AF_INET,
98 .sin_port = htons(portno),
99 .sin_addr = { .s_addr = INADDR_ANY}
101 if (bind(listen_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
104 printf("Listening on TCP 0.0.0.0:%d...\n", portno);
105 listen(listen_fd, 3); // Max 3 pending connections
107 set_callback(data_callback);
108 threads_.emplace_back([this] {server();});
111 void set_callback(std::function<int(json& message)> data_callback)
113 data_callback_ = data_callback;
114 if (!data_callback && data_pipe_[0] == STDIN_FILENO) {
115 printf("No data callback provided, creating data pipe...\n");
116 if (pipe(data_pipe_))
117 panic("Cannot create data pipe");
118 data_filebuf_ = __gnu_cxx::stdio_filebuf<char> {data_pipe_[0], std::ios::in};
122 std::basic_filebuf<char>* get_data_filebuf()
124 return &data_filebuf_;
130 for (auto& t: threads_)
136 std::vector<std::thread> threads_;
138 std::function<int(json& message)> data_callback_;
139 int data_pipe_[2] = {STDIN_FILENO, STDOUT_FILENO}; // Default to stdin, stdout
140 __gnu_cxx::stdio_filebuf<char> data_filebuf_;
146 int connection_fd = accept(listen_fd, NULL, NULL);
147 if (connection_fd <= 0)
148 panic("ERROR on accept");
149 threads_.emplace_back([this, connection_fd] {service(connection_fd);});
153 void service(int connection_fd)
155 std::stringstream ss; ss << "RIC tester thread " << std::this_thread::get_id();
157 std::clog << I << " starting connection on fd " << connection_fd << std::endl;
158 int report_count = 0;
159 auto log_time = Clock::now();
161 __gnu_cxx::stdio_filebuf<char>
162 inbuf {connection_fd, std::ios::in},
163 outbuf{data_pipe_[1], std::ios::out};
164 std::istream input {&inbuf};
165 std::ostream output{&outbuf};
167 while (alive && input.good())
171 // Pipe raw data and let user decode JSON
173 input.get(buf[0]); // Block until 1 char is ready
174 output.write(buf, 1 + input.readsome(buf+1, sizeof(buf)-1));
178 // Decode JSON message and pass it to user's callback
181 std::clog << I << " Message type: " << message.begin().key() << std::endl;
182 report_count += data_callback_(message);
183 } catch (const std::exception& ex) {
184 std::clog << I << " ERROR in CALLBACK: " << ex.what() << std::endl;
187 if (Clock::now() > log_time) {
188 std::clog << I << " got " << report_count << " reports" << std::endl;
192 } catch (const std::exception& ex) {
193 std::clog << I << " ERROR: " << ex.what() << std::endl;
195 std::clog << I << " closed connection after " << report_count << " reports" << std::endl;