8c152b02febd6d7b6c69646b74f88bc58e99ac4b
[sim/e2-interface.git] / e2sim / e2sm_examples / kpm_e2sm / src / kpm / viavi_connector.hpp
1 // VIAVI RIC Tester streaming data input and closed loop
2 //
3 // Maintainer: agustin.pozuelo@viavisolutions.com
4 //
5 // Copyright 2020 VIAVI Solutions
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License")
8 // you may not use this file except in compliance with the License
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
16 // See the License for the specific language governing permissions and
17 // limitations under the License
18
19 #include <atomic>
20 #include <chrono>
21 #include <functional>
22 #include <ext/stdio_filebuf.h>
23 #include <iostream>
24 #include <memory>
25 #include <mutex>
26 #include <stdexcept>
27 #include <sstream>
28 #include <thread>
29 #include <unordered_map>
30 #include <vector>
31
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <sys/socket.h>
35 #include <sys/un.h>
36 #include <stdlib.h>
37 #include <stdio.h>
38 #include <unistd.h>
39
40 #include <nlohmann/json.hpp>
41
42
43 namespace viavi {
44
45 using namespace std::chrono_literals;
46 using Clock = std::chrono::steady_clock;
47 using json = nlohmann::json;
48
49 void panic(const char* why)
50 {
51     throw std::runtime_error(std::string("RIC tester: ") + why
52         + ". ERRNO " + std::to_string(errno) + ": " + ::strerror(errno));
53 }
54
55 static int sample_data_callback(json& message)
56 {
57     std::stringstream ss; ss << "RIC tester thread " << std::this_thread::get_id();
58     auto I = ss.str();
59     int report_count = 0;
60     if (message.contains("ueMeasReport")) {
61         json reports = message["ueMeasReport"]["ueMeasReportList"];
62         
63         message["ueMeasReport"].erase("ueMeasReportList");
64         std::clog << I << " got " << reports.size() << " UE reports: " << message["ueMeasReport"];
65         
66         for (auto& report: reports) {
67             ++report_count;
68             int serving_cell = report["nrCellIdentity"];
69             int rsrp = report["servingCellRfReport"]["rsrp"];
70         }
71     }
72     if (message.contains("cellMeasReport")) {
73         json reports = message["cellMeasReport"]["cellMeasReportList"];
74         
75         message["cellMeasReport"].erase("cellMeasReportList");
76         std::clog << I << " got " << reports.size() << " cell reports: " << message["cellMeasReport"];
77         
78         for (auto& report: reports) {
79             ++report_count;
80             int serving_cell = report["nrCellIdentity"];
81             int throughput = report["throughput"];
82         }
83     }
84     return report_count;
85 }
86
87 class RICTesterReceiver
88 {
89     public:
90     RICTesterReceiver(int portno = 3001, std::function<int(json& message)> data_callback = sample_data_callback)
91     {
92         listen_fd = socket(AF_INET, SOCK_STREAM, 0);
93         int enabled = 1;
94         if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) < 0)
95             panic("ERROR opening socket");
96         struct sockaddr_in serv_addr = {
97             .sin_family = AF_INET,
98             .sin_port = htons(portno),
99             .sin_addr = { .s_addr = INADDR_ANY}
100         };
101         if (bind(listen_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
102             panic("Bind error");
103
104         printf("Listening on TCP 0.0.0.0:%d...\n", portno);
105         listen(listen_fd, 3); // Max 3 pending connections
106
107         set_callback(data_callback);
108         threads_.emplace_back([this] {server();});
109     }
110
111     void set_callback(std::function<int(json& message)> data_callback)
112     {
113         data_callback_ = data_callback;
114         if (!data_callback && data_pipe_[0] == STDIN_FILENO) {
115             printf("No data callback provided, creating data pipe...\n");
116             if (pipe(data_pipe_))
117                 panic("Cannot create data pipe");
118             data_filebuf_ = __gnu_cxx::stdio_filebuf<char> {data_pipe_[0], std::ios::in};
119         }
120     }
121
122     std::basic_filebuf<char>* get_data_filebuf()
123     {
124         return &data_filebuf_;
125     }
126
127     ~RICTesterReceiver()
128     {
129         alive = false;
130         for (auto& t: threads_)
131             t.join();
132     }
133
134     protected:
135     int listen_fd;
136     std::vector<std::thread> threads_;
137     bool alive = true;
138     std::function<int(json& message)> data_callback_;
139     int data_pipe_[2] = {STDIN_FILENO, STDOUT_FILENO};  // Default to stdin, stdout
140     __gnu_cxx::stdio_filebuf<char> data_filebuf_;
141
142     void server()
143     {
144         while (alive)
145         {
146             int connection_fd = accept(listen_fd, NULL, NULL);
147             if (connection_fd <= 0)
148                 panic("ERROR on accept");
149             threads_.emplace_back([this, connection_fd] {service(connection_fd);});
150         }
151     }
152
153     void service(int connection_fd)
154     {
155         std::stringstream ss; ss << "RIC tester thread " << std::this_thread::get_id();
156         auto I = ss.str();
157         std::clog << I << " starting connection on fd " << connection_fd << std::endl;
158         int report_count = 0;
159         auto log_time = Clock::now();
160         try {
161             __gnu_cxx::stdio_filebuf<char>
162                 inbuf {connection_fd, std::ios::in},
163                 outbuf{data_pipe_[1], std::ios::out};
164             std::istream input {&inbuf};
165             std::ostream output{&outbuf};
166
167             while (alive && input.good())
168             {
169                 if (!data_callback_)
170                 {
171                     // Pipe raw data and let user decode JSON
172                     char buf[4096];
173                     input.get(buf[0]); // Block until 1 char is ready
174                     output.write(buf, 1 + input.readsome(buf+1, sizeof(buf)-1));
175                 }
176                 else try
177                 {
178                     // Decode JSON message and pass it to user's callback
179                     json message;
180                     input >> message;
181                     std::clog << I << " Message type: " << message.begin().key() << std::endl;
182                     report_count += data_callback_(message);
183                 } catch (const std::exception& ex) {
184                     std::clog << I << " ERROR in CALLBACK: " << ex.what() << std::endl;
185                 }
186
187                 if (Clock::now() > log_time) {
188                     std::clog << I << " got " << report_count << " reports" << std::endl;
189                     log_time += 10s;
190                 }
191             }
192         } catch (const std::exception& ex) {
193             std::clog << I << " ERROR: " << ex.what() << std::endl;
194         }
195         std::clog << I << " closed connection after " << report_count << " reports" << std::endl;
196     }
197
198 };
199
200 }