first drop of WLG source and directory
[it/test.git] / simulators / workload_generators / e2e_testing / src / wg_serial.cpp
1 /*****************************************************************************
2 #                                                                            *
3 # Copyright 2020 AT&T Intellectual Property                                  *
4 # Copyright 2020 Nokia                                                       *
5 #                                                                            *
6 # Licensed under the Apache License, Version 2.0 (the "License");            *
7 # you may not use this file except in compliance with the License.           *
8 # You may obtain a copy of the License at                                    *
9 #                                                                            *
10 #      http://www.apache.org/licenses/LICENSE-2.0                            *
11 #                                                                            *
12 # Unless required by applicable law or agreed to in writing, software        *
13 # distributed under the License is distributed on an "AS IS" BASIS,          *
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   *
15 # See the License for the specific language governing permissions and        *
16 # limitations under the License.                                             *
17 #                                                                            *
18 ******************************************************************************/
19
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string>
23 #include <iostream>
24 #include <vector>
25 #include <thread>
26 #include "./wg_defs.h"
27 #include "e2sim_sctp.hpp"
28 #include "e2ap_message_handler.hpp"
29
30 extern "C" {
31 #include "E2AP-PDU.h"
32 #include "e2ap_asn1c_codec.h"
33 #include "ProtocolIE-Field.h"
34 }
35
36
37 using namespace std;
38
39 int keep_looping;
40 struct timespec* start_ts = (struct timespec*)malloc(sizeof(struct timespec));
41 vector<struct timespec*> msg_ts = vector<struct timespec*>(NUM_SAMPLE, NULL);
42 vector<float> latency_v = vector<float>(NUM_SAMPLE, 0.0);
43
44 int subresponse_get_sequenceNum(E2AP_PDU_t* pdu) {
45   SuccessfulOutcome_t* responseMsg = pdu->choice.successfulOutcome;
46   RICrequestID_t* requestid;
47   int num_IEs = responseMsg->value.choice.RICsubscriptionResponse.protocolIEs.\
48       list.count;
49
50   for (int edx = 0; edx < num_IEs; edx++) {
51     RICsubscriptionResponse_IEs_t* memb_ptr =
52         responseMsg->value.choice.RICsubscriptionResponse.protocolIEs.list.\
53         array[edx];
54     switch (memb_ptr->id) {
55       case (ProtocolIE_ID_id_RICrequestID):
56         requestid = &memb_ptr->value.choice.RICrequestID;
57         return requestid->ricRequestSequenceNumber;
58         break;
59     }
60   }
61 }
62
63 void subrequest_set_sequenceNum(E2AP_PDU_t* pdu, int seq) {
64   InitiatingMessage_t* initiatingMessage = pdu->choice.initiatingMessage;
65   RICrequestID_t* requestid;
66   int num_IEs = initiatingMessage->value.choice.RICsubscriptionRequest.\
67       protocolIEs.list.count;
68   for (int edx = 0; edx < num_IEs; edx++) {
69     RICsubscriptionRequest_IEs_t* memb_ptr = initiatingMessage->value.choice.\
70         RICsubscriptionRequest.protocolIEs.list.array[edx];
71     switch (memb_ptr->id) {
72       case (ProtocolIE_ID_id_RICrequestID):
73         requestid = &memb_ptr->value.choice.RICrequestID;
74         requestid->ricRequestSequenceNumber = seq;
75         break;
76     }
77   }
78 }
79
80 int wg_setup(int socket_fd) {
81   sctp_buffer_t recv_buf;
82   // stage 1: Receive ENDC_X2_Setup Request; Send ENDC_X2_Setup Response
83   while (sctp_receive_data(socket_fd, recv_buf) <= 0)
84     continue;
85   // decode the data into E2AP-PDU
86   E2AP_PDU_t* pdu = new E2AP_PDU_t();
87   e2ap_asn1c_decode_pdu(pdu, recv_buf.buffer, recv_buf.len);
88   int procedureCode = e2ap_asn1c_get_procedureCode(pdu);
89   int index = static_cast<int>(pdu->present);
90   e2ap_handle_ENDCX2SetupRequest(pdu, socket_fd);
91   // stage 2: send a response wait for request,if not keep sending
92   while (1) {
93     E2AP_PDU_t* res_pdu = e2ap_xml_to_pdu("E2AP_RICsubscriptionResponse.xml");
94     uint8_t* buf;
95     sctp_buffer_t data;
96     data.len = e2ap_asn1c_encode_pdu(res_pdu, &buf);
97     memcpy(data.buffer, buf, data.len);
98     if (sctp_send_data(socket_fd, data) > 0) {
99       LOG_I("[WG] Sent RIC-SUBSCRIPTION-RESPONSE");
100     } else {
101       LOG_E("[WG] Unable to send RIC-SUBSCRIPTION-RESPONSE to peer");
102     }
103     while (sctp_receive_data(socket_fd, recv_buf) <= 0)
104       continue;
105     e2ap_asn1c_decode_pdu(pdu, recv_buf.buffer, recv_buf.len);
106     // e2ap_asn1c_print_pdu(pdu);
107     procedureCode = e2ap_asn1c_get_procedureCode(pdu);
108     index = static_cast<int>(pdu->present);
109     if (procedureCode == ProcedureCode_id_ricSubscription && \
110         index == E2AP_PDU_PR_initiatingMessage) {
111       LOG_I("[WG] Received RIC-SUBSCRIPTION-REQUEST");
112       return 0;
113     }
114   }
115   return -1;
116 }
117
118 int wg_receiver(int client_fd) {
119   int count_msg_recved = 0;
120   sctp_buffer_t recv_buf;
121   // stage 1: Receive ENDC_X2_Setup Request; Send ENDC_X2_Setup Response
122   while (sctp_receive_data(client_fd, recv_buf) <= 0)
123     continue;
124   count_msg_recved++;
125   // decode the data into E2AP-PDU
126   E2AP_PDU_t* pdu = new E2AP_PDU_t();
127   e2ap_asn1c_decode_pdu(pdu, recv_buf.buffer, recv_buf.len);
128   int seq_n = subresponse_get_sequenceNum(pdu);
129   int index = seq_n % keep_looping;
130   if (index == 0) {
131     char fname[256];
132     snprintf(fname, sizeof fname, "cdf.csv");
133     FILE *fptr = fopen(fname, "w");
134     for (int i = 0; i < latency_v.size(); i++)
135       fprintf(fptr, "%d %f\n", i, latency_v[i]);
136     fclose(fptr);
137   }
138   if (msg_ts[index] != NULL && index < msg_ts.size()) {
139     struct timespec* ts = (struct timespec*)malloc(sizeof(struct timespec));
140     clock_gettime(CLOCK_REALTIME, ts);
141     uint64_t elapse = (ts->tv_sec - msg_ts[index]->tv_sec) * 1000000000 + \
142         ts->tv_nsec - msg_ts[index]->tv_nsec;
143     double ms_elapse = static_cast<double>(elapse) / 1000000;
144     uint64_t total_elapse = (ts->tv_sec - start_ts->tv_sec) * 1000000000 + \
145         ts->tv_nsec - start_ts->tv_nsec;
146     double total_ms_elapse = static_cast<double>(total_elapse / 1000000);
147     latency_v[index] = ms_elapse;
148     cout << '\r' << ms_elapse << "ms " << count_msg_recved * 1000.0 / \
149         total_ms_elapse << "msgs/second" << flush;
150   }
151 }
152
153 int wg_generator(int client_fd, int lapse) {
154   int count = 0;
155   clock_gettime(CLOCK_REALTIME, start_ts);
156   while (1) {
157     if (count > NUM_SAMPLE) {
158       count = 0;
159       clock_gettime(CLOCK_REALTIME, start_ts);
160     }
161     E2AP_PDU_t* res_pdu = e2ap_xml_to_pdu("E2AP_RICsubscriptionResponse.xml");
162     subrequest_set_sequenceNum(res_pdu, count);
163     uint8_t* buf;
164     sctp_buffer_t data;
165     data.len = e2ap_asn1c_encode_pdu(res_pdu, &buf);
166     memcpy(data.buffer, buf, data.len);
167     // send response data over sctp
168     usleep(lapse);
169     if (sctp_send_data(client_fd, data) > 0) {
170       int index = count % keep_looping;
171       struct timespec* ts_p = (struct timespec*)malloc(sizeof(struct timespec));
172       clock_gettime(CLOCK_REALTIME, ts_p);
173       msg_ts[index] = ts_p;
174     } else {
175       LOG_E("[WG] Unable to send RIC-SUBSCRIPTION-RESPONSE to peer");
176       return -1;
177     }
178     count++;
179     wg_receiver(client_fd);
180   }
181   cout << endl;
182   return 0;
183 }
184
185 int main(int argc, char* argv[]) {
186   LOG_I("Start WG");
187   wg_options_t ops = wg_input_options(argc, argv);
188   int server_fd = sctp_start_server(ops.server_ip, ops.server_port);
189   int client_fd = sctp_accept_connection(ops.server_ip, server_fd);
190   keep_looping = NUM_SAMPLE;
191   LOG_I("[SCTP] Waiting for SCTP data");
192   uint64_t count = 0;
193   int lapse = static_cast<int>((1.0/static_cast<double>(ops.rate)) * SEC2MUS);
194   wg_setup(client_fd);
195   wg_generator(client_fd, lapse);
196 }