4cf1adef73381825180995a5511d861b53109310
[ric-plt/e2.git] / RIC-E2-TERMINATION / TEST / testAsn / sctpClient / sctpClient.cpp
1 /*
2  * Copyright 2020 AT&T Intellectual Property
3  * Copyright 2020 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law fprintfor agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 //
19 // Created by adi ENZEL on 2/10/20.
20 //
21
22 #include "sctpClient.h"
23
24 #define READ_BUFFER_SIZE 64 * 1024
25
26
27 using namespace std;
28
29
30 void createHttpLocalSocket(SctpClient_t *sctpClient) {
31     struct sockaddr_in address{};
32     int addrlen = sizeof(address);
33     address.sin_family = AF_INET;
34     address.sin_addr.s_addr = INADDR_ANY;
35     address.sin_port = htons(9098);
36     sctpClient->httpSocket = accept(sctpClient->httpBaseSocket, (struct sockaddr *) &address, (socklen_t *) &addrlen) < 0;
37     if (sctpClient->httpSocket) {
38         fprintf(stderr, "Accept() error. %s\n", strerror(errno));
39         exit(-1);
40     }
41     struct epoll_event event{};
42     event.data.fd = sctpClient->httpSocket;
43     event.events = (EPOLLIN | EPOLLET);
44     if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_ADD, sctpClient->httpSocket, &event) < 0) {
45         fprintf(stderr, "epoll_ctl EPOLL_CTL_ADD, %s\n", strerror(errno));
46         close(sctpClient->httpSocket);
47         exit(-1);
48     }
49 }
50
51 int createEpoll(SctpClient &sctpClient) {
52     sctpClient.epoll_fd = epoll_create1(0);
53     if (sctpClient.epoll_fd == -1) {
54         fprintf(stderr, "failed to open epoll descriptor. %s\n", strerror(errno));
55         return -1;
56     }
57     return sctpClient.epoll_fd;
58 }
59
60
61
62 int createSctpConnction(SctpClient *sctpClient, const char *address, int port, bool local) {
63     sctpClient->sctpSock = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
64     if (sctpClient->sctpSock < 0) {
65         fprintf(stderr, "Socket Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
66         return -1;
67     }
68     auto optval = 1;
69     if (setsockopt(sctpClient->sctpSock, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
70         fprintf(stderr, "setsockopt SO_REUSEPORT Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
71         close(sctpClient->sctpSock);
72         return -1;
73     }
74     optval = 1;
75     if (setsockopt(sctpClient->sctpSock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
76         fprintf(stderr, "setsockopt SO_REUSEADDR Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
77         close(sctpClient->sctpSock);
78         return -1;
79     }
80     struct sockaddr_in6 servaddr = {};
81 //    struct addrinfo hints = {};
82 //    struct addrinfo *result;
83
84     servaddr.sin6_family = AF_INET6;
85     servaddr.sin6_port = htons(port);      /* daytime server */
86     inet_pton(AF_INET6, address, &servaddr.sin6_addr);
87     // the bind here is to maintain the client port this is only if the test is not on the same IP as the tested system
88     if (!local) {
89         struct sockaddr_in6 localAddr{};
90         localAddr.sin6_family = AF_INET6;
91         localAddr.sin6_addr = in6addr_any;
92         localAddr.sin6_port = htons(port);
93         if (bind(sctpClient->sctpSock, (struct sockaddr *) &localAddr, sizeof(struct sockaddr_in6)) < 0) {
94             fprintf(stderr, "bind Socket Error, %s %s, %d\n", strerror(errno), __func__, __LINE__);
95             return -1;
96         }//Ends the binding.
97     }
98
99     // Add to Epol
100     struct epoll_event event{};
101     event.data.fd = sctpClient->sctpSock;
102     event.events = (EPOLLOUT | EPOLLIN | EPOLLET);
103     if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_ADD, sctpClient->sctpSock, &event) < 0) {
104         fprintf(stderr, "epoll_ctl EPOLL_CTL_ADD, %s\n", strerror(errno));
105         close(sctpClient->sctpSock);
106         return -1;
107     }
108
109     char hostBuff[NI_MAXHOST];
110     char portBuff[NI_MAXHOST];
111
112     if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
113                     hostBuff, sizeof(hostBuff),
114                     portBuff, sizeof(portBuff),
115                     (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
116         fprintf(stderr, "getnameinfo() Error, %s  %s %d\n", strerror(errno), __func__, __LINE__);
117         return -1;
118     }
119
120     auto flags = fcntl(sctpClient->sctpSock, F_GETFL, 0);
121     if (flags == -1) {
122         fprintf(stderr, "fcntl error. %s\n", strerror(errno));
123         close(sctpClient->sctpSock);
124         return -1;
125     }
126
127     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
128     if (fcntl(sctpClient->sctpSock, F_SETFL, flags) == -1) {
129         fprintf(stderr, "fcntl set O_NONBLOCK fail. %s\n", strerror(errno));
130         close(sctpClient->sctpSock);
131         return -1;
132     }
133
134     if (connect(sctpClient->sctpSock, (SA *) &servaddr, sizeof(servaddr)) < 0) {
135         if (errno != EINPROGRESS) {
136             fprintf(stderr, "connect FD %d to host : %s port %d, %s\n", sctpClient->sctpSock, address, port,
137                     strerror(errno));
138             close(sctpClient->sctpSock);
139             return -1;
140         }
141         fprintf(stdout, "Connect to FD %d returned with EINPROGRESS : %s\n", sctpClient->sctpSock, strerror(errno));
142     }
143     return sctpClient->sctpSock;
144 }
145
146 __attribute_warn_unused_result__ int createListeningTcpConnection(SctpClient *sctpClient) {
147     if ((sctpClient->httpBaseSocket = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
148         fprintf(stderr, "socket failed. %s", strerror(errno));
149         return -1;
150     }
151
152     struct sockaddr_in address{};
153     address.sin_family = AF_INET;
154     address.sin_addr.s_addr = INADDR_ANY;
155     address.sin_port = htons(9098);
156
157     if (bind(sctpClient->httpBaseSocket, (struct sockaddr *)&address, sizeof(address)) < 0) {
158         fprintf(stderr, "Bind failed , %s %s, %d\n", strerror(errno), __func__, __LINE__);
159         return -1;
160     }
161
162     struct epoll_event event{};
163     event.data.fd = sctpClient->httpBaseSocket;
164     event.events = (EPOLLIN | EPOLLET);
165     if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_ADD, sctpClient->httpBaseSocket, &event) < 0) {
166         fprintf(stderr, "epoll_ctl EPOLL_CTL_ADD, %s\n", strerror(errno));
167         close(sctpClient->httpBaseSocket);
168         return -1;
169     }
170     if (listen(sctpClient->httpBaseSocket, 128) < 0)
171     {
172         fprintf(stderr,"listen() error. %s", strerror(errno));
173         return -1;
174     }
175     return 0;
176 }
177
178 __attribute_warn_unused_result__ int modifyEpollToRead(SctpClient *sctpClient, int modifiedSocket) {
179     struct epoll_event event{};
180     event.data.fd = modifiedSocket;
181     event.events = (EPOLLIN | EPOLLET);
182     if (epoll_ctl(sctpClient->epoll_fd, EPOLL_CTL_MOD, modifiedSocket, &event) < 0) {
183         fprintf(stderr, "failed to open epoll descriptor. %s\n", strerror(errno));
184         return -1;
185     }
186     return 0;
187 }
188
189
190 __attribute_warn_unused_result__ cxxopts::ParseResult parse(SctpClient &sctpClient, int argc, char *argv[]) {
191     cxxopts::Options options(argv[0], "sctp client test application");
192     options.positional_help("[optional args]").show_positional_help();
193     options.allow_unrecognised_options().add_options()
194             ("a,host", "Host address", cxxopts::value<std::string>(sctpClient.host)->default_value("127.0.0.1"))
195             ("p,port", "port number", cxxopts::value<int>(sctpClient.rmrPort)->default_value("38200"))
196             ("h,help", "Print help");
197
198     auto result = options.parse(argc, argv);
199
200     if (result.count("help")) {
201         std::cout << options.help({""}) << std::endl;
202         exit(0);
203     }
204     return result;
205 }
206
207 void run(SctpClient_t *sctpClient) {
208     cout << "in theread" << endl;
209     sleep(10);
210     cout << "in theread after sleep" << endl;
211     sleep(10);
212 }
213
214 void runFunc(SctpClient_t *sctpClient) {
215     cout << "in theread 1" << endl;
216
217     char rmrAddress[128] {};
218     cout << "in theread 2" << endl;
219
220     snprintf(rmrAddress, 128, "%d", sctpClient->rmrPort);
221     cout << "in theread 3" << endl;
222
223     RmrClient rmrClient = {rmrAddress, sctpClient->epoll_fd};
224     cout << "in theread 4" << endl;
225
226
227     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
228 //    auto counter = 1000;
229     //    uint64_t st = 0;
230 //    uint32_t aux1 = 0;
231 //    st = rdtscp(aux1);
232     E2AP_PDU_t *pdu = nullptr;
233
234     auto *msg = rmrClient.allocateRmrMsg(8192);
235     while (true) {
236         auto numOfEvents = epoll_wait(sctpClient->epoll_fd, events, MAXEVENTS, 1000);
237         if (numOfEvents < 0) {
238             if (errno == EINTR) {
239                 fprintf(stderr, "got EINTR : %s\n", strerror(errno));
240                 continue;
241             }
242             fprintf(stderr, "Epoll wait failed, errno = %s\n", strerror(errno));
243             break;
244         }
245         if (numOfEvents == 0) { // timeout
246 //            if (--counter <= 0) {
247 //                fprintf(stdout, "Finish waiting for epoll. going out of the thread\n");
248 //                continue;
249 //            }
250         }
251
252         auto done = 0;
253
254         for (auto i = 0; i < numOfEvents; i++) {
255             uint32_t aux1 = 0;
256             auto start = rdtscp(aux1);
257
258             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
259                 fprintf(stderr, "Got EPOLLERR or EPOLLHUP on fd = %d, errno = %s\n", events[i].data.fd,
260                         strerror(errno));
261                 close(events[i].data.fd);
262             } else if (events[i].events & EPOLLOUT) {  // AFTER EINPROGRESS
263                 if (modifyEpollToRead(sctpClient, events[i].data.fd) < 0) {
264                     fprintf(stderr, "failed modify FD %d after got EINPROGRESS\n", events[i].data.fd);
265                     close(events[i].data.fd);
266                     continue;
267                 }
268                 fprintf(stdout, "Connected to server after EinProgreevents[i].data.fdss FD %d\n", events[i].data.fd);
269                 //TODO need to define RmrClient class
270             } else if (events[i].data.fd == sctpClient->httpBaseSocket) {
271                 createHttpLocalSocket(sctpClient);
272             } else if (events[i].data.fd == sctpClient->httpSocket) {
273                 //TODO handle messages from the http server
274                 char buffer[READ_BUFFER_SIZE] {};
275                 while (true) {
276                     auto size = read(sctpClient->httpSocket, buffer, READ_BUFFER_SIZE);
277                     if (size < 0) {
278                         if (errno == EINTR) {
279                             continue;
280                         }
281                         if (errno != EAGAIN) {
282                             fprintf(stderr, "Read error, %s\n", strerror(errno));
283                             done = 1;
284                         }
285                         break; // EAGAIN exit from loop on read normal way or on read error
286                     }
287                     // we got message get the id of message
288                     char *tmp;
289
290                     // get mesage type
291                     char *val = strtok_r(buffer, sctpClient->delimiter, &tmp);
292                     messages_t messageType;
293                     char *dummy;
294                     if (val != nullptr) {
295                         messageType = (decltype(messageType))strtol(val, &dummy, 10);
296                     } else {
297                         fprintf(stderr,"wrong message %s", buffer);
298                         break;
299                     }
300                     char sctpLinkId[128] {};
301                     val = strtok_r(nullptr, sctpClient->delimiter, &tmp);
302                     if (val != nullptr) {
303                         memcpy(sctpLinkId, val, tmp - val);
304                     } else {
305                         fprintf(stderr,"wriong id %s", buffer);
306                         break;
307                     }
308
309                     char *values[128] {};
310                     int index = 0;
311                     while ((val = strtok_r(nullptr, sctpClient->delimiter, &tmp)) != nullptr) {
312                         auto valueLen = tmp - val;
313                         values[index] = (char *)calloc(1, valueLen);
314                         memcpy(values[index], val, valueLen);
315                         index++;
316                     }
317                     values[i] = (char *)calloc(1, strlen(tmp));
318
319                     switch ((int)messageType) {
320                         case setupRequest_gnb:
321                         case setupRequest_en_gNB:
322                         case setupRequest_ng_eNB:
323                         case setupRequest_eNB: {
324
325                             char *ricAddress = nullptr;
326                             if (values[0] != nullptr) {
327                                ricAddress = values[0];
328                             } else {
329                                 fprintf(stderr,"wrong address %s", buffer);
330                                 break;
331                             }
332                             //ric port
333                             int ricPort = 0;
334
335                             if (values[1] != nullptr) {
336                                 ricPort = (decltype(ricPort))strtol(values[1], &dummy, 10);
337                             } else {
338                                 fprintf(stderr,"wrong port %s", buffer);
339                                 for (auto e : values) {
340                                     if (e != nullptr) {
341                                         free(e);
342                                     }
343                                 }
344                                 break;
345                             }
346
347                             // need to send message to E2Term
348                             // build connection
349                             auto fd = createSctpConnction(sctpClient, (const char *)ricAddress, ricPort);
350                             if (fd < 0) {
351                                 fprintf(stderr,"Failed to create connection to %s:%d\n", ricAddress, ricPort);
352                                 for (auto e : values) {
353                                     if (e != nullptr) {
354                                         free(e);
355                                     }
356                                 }
357                                 break;
358                             }
359
360                             auto len = strlen(values[index]);
361                             auto *b64Decoded = (unsigned char *)calloc(1, len);
362                             base64::decode((const unsigned char *)values[index], len, b64Decoded, (long)len);
363
364                             for (auto e : values) {
365                                 if (e != nullptr) {
366                                     free(e);
367                                 }
368                             }
369                             // send data
370                             while (true) {
371                                 if (send(fd, b64Decoded, len, MSG_NOSIGNAL) < 0) {
372                                     if (errno == EINTR) {
373                                         continue;
374                                     }
375                                     cerr << "Error sendingdata to e2Term. " << strerror(errno) << endl;
376                                     break;
377                                 }
378                                 cout << "Message sent" << endl;
379                                 break;
380                             }
381                             free(b64Decoded);
382                             char key[128] {};
383                             char *value = (char *)calloc(1,256);
384                             snprintf(key, 128, "id:%s", sctpLinkId);
385                             snprintf(value, 16, "%d", fd);
386                             sctpClient->mapKey.setkey(key, (void *)value);
387
388                             snprintf(key, 128, "fd:%d", fd);
389                             snprintf(&value[128], 128, "%s", sctpLinkId);
390                             sctpClient->mapKey.setkey(key, (void *)&value[128]);
391                             break;
392                         }
393                         case nothing:
394                         default: {
395                             break;
396                         }
397                     }
398                  }
399             } else if (events[i].data.fd == rmrClient.getRmrFd()) {
400                 msg->state = 0;
401                 msg = rmr_rcv_msg(rmrClient.getRmrCtx(), msg);
402                 if (msg == nullptr) {
403                     cerr << "rmr_rcv_msg return with null pointer" << endl;
404                     exit(-1);
405                 } else if (msg->state != 0) {
406                     cerr << "rmr_rcv_msg return with error status number : " << msg->state << endl;
407                     msg->state = 0;
408                     continue;
409                 }
410          sleep(100);       cout << "Got RMR message number : " << msg->mtype << endl;
411
412             } else { // got data from server
413                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
414                 * We must read whatever data is available completely, as we are running
415                 *  in edge-triggered mode and won't get a notification again for the same data. */
416                 //TODO build a callback function to support many tests
417                 if (pdu != nullptr) {
418                     ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
419                 }
420                 unsigned char buffer[SCTP_BUFFER_SIZE]{};
421                 while (true) {
422                     auto len = read(events[i].data.fd, buffer, SCTP_BUFFER_SIZE);
423                     if (len < 0) {
424                         if (errno == EINTR) {
425                             continue;
426                         }
427                         /* If errno == EAGAIN, that means we have read all
428                         data. So go back to the main loop. */
429                         if (errno != EAGAIN) {
430                             fprintf(stderr, "Read error, %s\n", strerror(errno));
431                             done = 1;
432                         }
433                         break; // EAGAIN exit from loop on read normal way or on read error
434                     } else if (len == 0) {
435                         /* End of file. The remote has closed the connection. */
436                         fprintf(stdout, "EOF Closed connection - descriptor = %d", events[i].data.fd);
437                         done = 1;
438                         break;
439                     }
440
441                     asn_dec_rval_t rval;
442                     rval = asn_decode(nullptr,
443                                       ATS_ALIGNED_BASIC_PER,
444                                       &asn_DEF_E2AP_PDU,
445                                       (void **) &pdu,
446                                       buffer,
447                                       len);
448                     if (rval.code != RC_OK) {
449                         fprintf(stderr, "Error %d Decoding E2AP PDU from E2TERM\n", rval.code);
450                         break;
451                     }
452                     //TODO handle messages
453                     //                    switch (pdu->present) {
454                     //                        case E2AP_PDU_PR_initiatingMessage: {//initiating message
455                     //                            asnInitiatingRequest(pdu, message, rmrMessageBuffer);
456                     //                            break;
457                     //                        }
458                     //                        case E2AP_PDU_PR_successfulOutcome: { //successful outcome
459                     //                            asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer);
460                     //                            break;
461                     //                        }
462                     //                        case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
463                     //                            asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer);
464                     //                            break;
465                     //            ipv6 client server c program            }
466                     //                        case E2AP_PDU_PR_NOTHING:
467                     //                        default:
468                     //                            fprintf(stderr, "Unknown index %d in E2AP PDU\n", pdu->present);
469                     //                            break;
470                     //                    }
471                 }
472             }
473             aux1 = 0;
474             fprintf(stdout, "one loop took  %ld clocks\n", rdtscp(aux1) - start);
475         }
476         if (done) {
477             //TODO report to RMR on closed connection
478         }
479     }
480     return;// nullptr;
481 }
482
483 auto main(const int argc, char **argv) -> int {
484     SctpClient_t sctpClient;
485     //unsigned num_cpus = std::thread::hardware_concurrency();
486
487     auto result = parse(sctpClient, argc, argv);
488     auto epoll_fd = createEpoll(sctpClient);
489     if (epoll_fd <= 0) {
490         exit(-1);
491     }
492     if (createListeningTcpConnection(&sctpClient) < 0) {
493         exit(-1);
494     }
495     std::thread th(runFunc, &sctpClient);
496 //    std::thread th(run, &sctpClient);
497
498
499     sleep(29);
500     //start the http server
501     Port port(9080);
502
503     Address addr(Ipv4::any(), port);
504     HttpServer server(addr);
505
506     server.init(1);
507
508
509     server.start();
510
511     th.join();
512
513 }