2 * Copyright 2020 AT&T Intellectual Property
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law fprintfor agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Created by adi ENZEL on 2/10/20.
22 #include "sctpClient.h"
24 #define READ_BUFFER_SIZE 64 * 1024
30 void createHttpLocalSocket(SctpClient_t *sctpClient) {
31 struct sockaddr_in address{};
32 int addrlen = sizeof(address);
33 address.sin_family = AF_INET;
34 address.sin_addr.s_addr = INADDR_ANY;
35 address.sin_port = htons(9098);
36 sctpClient->httpSocket = accept(sctpClient->httpBaseSocket, (struct sockaddr *) &address, (socklen_t *) &addrlen) < 0;
37 if (sctpClient->httpSocket) {
38 fprintf(stderr, "Accept() error. %s\n", strerror(errno));
41 struct epoll_event event{};
42 event.data.fd = sctpClient->httpSocket;
43 event.events = (EPOLLIN | EPOLLET);
44 if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_ADD, sctpClient->httpSocket, &event) < 0) {
45 fprintf(stderr, "epoll_ctl EPOLL_CTL_ADD, %s\n", strerror(errno));
46 close(sctpClient->httpSocket);
51 int createEpoll(SctpClient &sctpClient) {
52 sctpClient.epoll_fd = epoll_create1(0);
53 if (sctpClient.epoll_fd == -1) {
54 fprintf(stderr, "failed to open epoll descriptor. %s\n", strerror(errno));
57 return sctpClient.epoll_fd;
62 int createSctpConnction(SctpClient *sctpClient, const char *address, int port, bool local) {
63 sctpClient->sctpSock = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
64 if (sctpClient->sctpSock < 0) {
65 fprintf(stderr, "Socket Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
69 if (setsockopt(sctpClient->sctpSock, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
70 fprintf(stderr, "setsockopt SO_REUSEPORT Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
71 close(sctpClient->sctpSock);
75 if (setsockopt(sctpClient->sctpSock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
76 fprintf(stderr, "setsockopt SO_REUSEADDR Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
77 close(sctpClient->sctpSock);
80 struct sockaddr_in6 servaddr = {};
81 // struct addrinfo hints = {};
82 // struct addrinfo *result;
84 servaddr.sin6_family = AF_INET6;
85 servaddr.sin6_port = htons(port); /* daytime server */
86 inet_pton(AF_INET6, address, &servaddr.sin6_addr);
87 // the bind here is to maintain the client port this is only if the test is not on the same IP as the tested system
89 struct sockaddr_in6 localAddr{};
90 localAddr.sin6_family = AF_INET6;
91 localAddr.sin6_addr = in6addr_any;
92 localAddr.sin6_port = htons(port);
93 if (bind(sctpClient->sctpSock, (struct sockaddr *) &localAddr, sizeof(struct sockaddr_in6)) < 0) {
94 fprintf(stderr, "bind Socket Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
100 struct epoll_event event{};
101 event.data.fd = sctpClient->sctpSock;
102 event.events = (EPOLLOUT | EPOLLIN | EPOLLET);
103 if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_ADD, sctpClient->sctpSock, &event) < 0) {
104 fprintf(stderr, "epoll_ctl EPOLL_CTL_ADD, %s\n", strerror(errno));
105 close(sctpClient->sctpSock);
109 char hostBuff[NI_MAXHOST];
110 char portBuff[NI_MAXHOST];
112 if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
113 hostBuff, sizeof(hostBuff),
114 portBuff, sizeof(portBuff),
115 (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
116 fprintf(stderr, "getnameinfo() Error, %s %s %d\n", strerror(errno), __func__, __LINE__);
120 auto flags = fcntl(sctpClient->sctpSock, F_GETFL, 0);
122 fprintf(stderr, "fcntl error. %s\n", strerror(errno));
123 close(sctpClient->sctpSock);
127 flags = (unsigned) flags | (unsigned) O_NONBLOCK;
128 if (fcntl(sctpClient->sctpSock, F_SETFL, flags) == -1) {
129 fprintf(stderr, "fcntl set O_NONBLOCK fail. %s\n", strerror(errno));
130 close(sctpClient->sctpSock);
134 if (connect(sctpClient->sctpSock, (SA *) &servaddr, sizeof(servaddr)) < 0) {
135 if (errno != EINPROGRESS) {
136 fprintf(stderr, "connect FD %d to host : %s port %d, %s\n", sctpClient->sctpSock, address, port,
138 close(sctpClient->sctpSock);
141 fprintf(stdout, "Connect to FD %d returned with EINPROGRESS : %s\n", sctpClient->sctpSock, strerror(errno));
143 return sctpClient->sctpSock;
146 __attribute_warn_unused_result__ int createListeningTcpConnection(SctpClient *sctpClient) {
147 if ((sctpClient->httpBaseSocket = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
148 fprintf(stderr, "socket failed. %s", strerror(errno));
152 struct sockaddr_in address{};
153 address.sin_family = AF_INET;
154 address.sin_addr.s_addr = INADDR_ANY;
155 address.sin_port = htons(9098);
157 if (bind(sctpClient->httpBaseSocket, (struct sockaddr *)&address, sizeof(address)) < 0) {
158 fprintf(stderr, "Bind failed , %s %s, %d\n", strerror(errno), __func__, __LINE__);
162 struct epoll_event event{};
163 event.data.fd = sctpClient->httpBaseSocket;
164 event.events = (EPOLLIN | EPOLLET);
165 if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_ADD, sctpClient->httpBaseSocket, &event) < 0) {
166 fprintf(stderr, "epoll_ctl EPOLL_CTL_ADD, %s\n", strerror(errno));
167 close(sctpClient->httpBaseSocket);
170 if (listen(sctpClient->httpBaseSocket, 128) < 0)
172 fprintf(stderr,"listen() error. %s", strerror(errno));
178 __attribute_warn_unused_result__ int modifyEpollToRead(SctpClient *sctpClient, int modifiedSocket) {
179 struct epoll_event event{};
180 event.data.fd = modifiedSocket;
181 event.events = (EPOLLIN | EPOLLET);
182 if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_MOD, modifiedSocket, &event) < 0) {
183 fprintf(stderr, "failed to open epoll descriptor. %s\n", strerror(errno));
190 __attribute_warn_unused_result__ cxxopts::ParseResult parse(SctpClient &sctpClient, int argc, char *argv[]) {
191 cxxopts::Options options(argv[0], "sctp client test application");
192 options.positional_help("[optional args]").show_positional_help();
193 options.allow_unrecognised_options().add_options()
194 ("a,host", "Host address", cxxopts::value<std::string>(sctpClient.host)->default_value("127.0.0.1"))
195 ("p,port", "port number", cxxopts::value<int>(sctpClient.rmrPort)->default_value("38200"))
196 ("h,help", "Print help");
198 auto result = options.parse(argc, (const char **&)argv);
200 if (result.count("help")) {
201 std::cout << options.help({""}) << std::endl;
207 void run(SctpClient_t *sctpClient) {
208 cout << "in theread" << endl;
210 cout << "in theread after sleep" << endl;
214 void runFunc(SctpClient_t *sctpClient) {
215 cout << "in theread 1" << endl;
217 char rmrAddress[128] {};
218 cout << "in theread 2" << endl;
220 snprintf(rmrAddress, 128, "%d", sctpClient->rmrPort);
221 cout << "in theread 3" << endl;
223 RmrClient rmrClient = {rmrAddress, sctpClient->epoll_fd};
224 cout << "in theread 4" << endl;
227 auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
228 // auto counter = 1000;
230 // uint32_t aux1 = 0;
231 // st = rdtscp(aux1);
232 E2AP_PDU_t *pdu = nullptr;
234 auto *msg = rmrClient.allocateRmrMsg(8192);
236 auto numOfEvents = epoll_wait(sctpClient->epoll_fd, events, MAXEVENTS, 1000);
237 if (numOfEvents < 0) {
238 if (errno == EINTR) {
239 fprintf(stderr, "got EINTR : %s\n", strerror(errno));
242 fprintf(stderr, "Epoll wait failed, errno = %s\n", strerror(errno));
245 if (numOfEvents == 0) { // timeout
246 // if (--counter <= 0) {
247 // fprintf(stdout, "Finish waiting for epoll. going out of the thread\n");
254 for (auto i = 0; i < numOfEvents; i++) {
256 auto start = rdtscp(aux1);
258 if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
259 fprintf(stderr, "Got EPOLLERR or EPOLLHUP on fd = %d, errno = %s\n", events[i].data.fd,
261 close(events[i].data.fd);
262 } else if (events[i].events & EPOLLOUT) { // AFTER EINPROGRESS
263 if (modifyEpollToRead(sctpClient, events[i].data.fd) < 0) {
264 fprintf(stderr, "failed modify FD %d after got EINPROGRESS\n", events[i].data.fd);
265 close(events[i].data.fd);
268 fprintf(stdout, "Connected to server after EinProgreevents[i].data.fdss FD %d\n", events[i].data.fd);
269 //TODO need to define RmrClient class
270 } else if (events[i].data.fd == sctpClient->httpBaseSocket) {
271 createHttpLocalSocket(sctpClient);
272 } else if (events[i].data.fd == sctpClient->httpSocket) {
273 //TODO handle messages from the http server
274 char buffer[READ_BUFFER_SIZE] {};
276 auto size = read(sctpClient->httpSocket, buffer, READ_BUFFER_SIZE);
278 if (errno == EINTR) {
281 if (errno != EAGAIN) {
282 fprintf(stderr, "Read error, %s\n", strerror(errno));
285 break; // EAGAIN exit from loop on read normal way or on read error
287 // we got message get the id of message
291 char *val = strtok_r(buffer, sctpClient->delimiter, &tmp);
292 messages_t messageType;
294 if (val != nullptr) {
295 messageType = (decltype(messageType))strtol(val, &dummy, 10);
297 fprintf(stderr,"wrong message %s", buffer);
300 char sctpLinkId[128] {};
301 val = strtok_r(nullptr, sctpClient->delimiter, &tmp);
302 if (val != nullptr) {
303 memcpy(sctpLinkId, val, tmp - val);
305 fprintf(stderr,"wriong id %s", buffer);
309 char *values[128] {};
311 while ((val = strtok_r(nullptr, sctpClient->delimiter, &tmp)) != nullptr) {
312 auto valueLen = tmp - val;
313 values[index] = (char *)calloc(1, valueLen);
314 memcpy(values[index], val, valueLen);
317 values[i] = (char *)calloc(1, strlen(tmp));
319 switch ((int)messageType) {
320 case setupRequest_gnb:
321 case setupRequest_en_gNB:
322 case setupRequest_ng_eNB:
323 case setupRequest_eNB: {
325 char *ricAddress = nullptr;
326 if (values[0] != nullptr) {
327 ricAddress = values[0];
329 fprintf(stderr,"wrong address %s", buffer);
335 if (values[1] != nullptr) {
336 ricPort = (decltype(ricPort))strtol(values[1], &dummy, 10);
338 fprintf(stderr,"wrong port %s", buffer);
339 for (auto e : values) {
347 // need to send message to E2Term
349 auto fd = createSctpConnction(sctpClient, (const char *)ricAddress, ricPort);
351 fprintf(stderr,"Failed to create connection to %s:%d\n", ricAddress, ricPort);
352 for (auto e : values) {
360 auto len = strlen(values[index]);
361 auto *b64Decoded = (unsigned char *)calloc(1, len);
362 base64::decode((const unsigned char *)values[index], len, b64Decoded, (long)len);
364 for (auto e : values) {
371 if (send(fd, b64Decoded, len, MSG_NOSIGNAL) < 0) {
372 if (errno == EINTR) {
375 cerr << "Error sendingdata to e2Term. " << strerror(errno) << endl;
378 cout << "Message sent" << endl;
383 char *value = (char *)calloc(1,256);
384 snprintf(key, 128, "id:%s", sctpLinkId);
385 snprintf(value, 16, "%d", fd);
386 sctpClient->mapKey.setkey(key, (void *)value);
388 snprintf(key, 128, "fd:%d", fd);
389 snprintf(&value[128], 128, "%s", sctpLinkId);
390 sctpClient->mapKey.setkey(key, (void *)&value[128]);
399 } else if (events[i].data.fd == rmrClient.getRmrFd()) {
401 msg = rmr_rcv_msg(rmrClient.getRmrCtx(), msg);
402 if (msg == nullptr) {
403 cerr << "rmr_rcv_msg return with null pointer" << endl;
405 } else if (msg->state != 0) {
406 cerr << "rmr_rcv_msg return with error status number : " << msg->state << endl;
410 sleep(100); cout << "Got RMR message number : " << msg->mtype << endl;
412 } else { // got data from server
413 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
414 * We must read whatever data is available completely, as we are running
415 * in edge-triggered mode and won't get a notification again for the same data. */
416 //TODO build a callback function to support many tests
417 if (pdu != nullptr) {
418 ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
420 unsigned char buffer[SCTP_BUFFER_SIZE]{};
422 auto len = read(events[i].data.fd, buffer, SCTP_BUFFER_SIZE);
424 if (errno == EINTR) {
427 /* If errno == EAGAIN, that means we have read all
428 data. So go back to the main loop. */
429 if (errno != EAGAIN) {
430 fprintf(stderr, "Read error, %s\n", strerror(errno));
433 break; // EAGAIN exit from loop on read normal way or on read error
434 } else if (len == 0) {
435 /* End of file. The remote has closed the connection. */
436 fprintf(stdout, "EOF Closed connection - descriptor = %d", events[i].data.fd);
442 rval = asn_decode(nullptr,
443 ATS_ALIGNED_BASIC_PER,
448 if (rval.code != RC_OK) {
449 fprintf(stderr, "Error %d Decoding E2AP PDU from E2TERM\n", rval.code);
452 //TODO handle messages
453 // switch (pdu->present) {
454 // case E2AP_PDU_PR_initiatingMessage: {//initiating message
455 // asnInitiatingRequest(pdu, message, rmrMessageBuffer);
458 // case E2AP_PDU_PR_successfulOutcome: { //successful outcome
459 // asnSuccessfulMsg(pdu, message, sctpMap, rmrMessageBuffer);
462 // case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
463 // asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer);
465 // ipv6 client server c program }
466 // case E2AP_PDU_PR_NOTHING:
468 // fprintf(stderr, "Unknown index %d in E2AP PDU\n", pdu->present);
474 fprintf(stdout, "one loop took %ld clocks\n", rdtscp(aux1) - start);
477 //TODO report to RMR on closed connection
483 auto main(const int argc, char **argv) -> int {
484 SctpClient_t sctpClient;
485 //unsigned num_cpus = std::thread::hardware_concurrency();
487 auto result = parse(sctpClient, argc, argv);
488 auto epoll_fd = createEpoll(sctpClient);
492 if (createListeningTcpConnection(&sctpClient) < 0) {
495 std::thread th(runFunc, &sctpClient);
496 // std::thread th(run, &sctpClient);
500 //start the http server
503 Address addr(Ipv4::any(), port);
504 HttpServer server(addr);