Merge "IMPL: A1 <-> XApp Message Flow Testing"
[it/test.git] / simulators / workload_generators / e2e_testing / src / wg_concur.cpp
1 /*****************************************************************************
2 #                                                                            *
3 # Copyright 2020 AT&T Intellectual Property                                  *
4 # Copyright 2020 Nokia                                                       *
5 #                                                                            *
6 # Licensed under the Apache License, Version 2.0 (the "License");            *
7 # you may not use this file except in compliance with the License.           *
8 # You may obtain a copy of the License at                                    *
9 #                                                                            *
10 #      http://www.apache.org/licenses/LICENSE-2.0                            *
11 #                                                                            *
12 # Unless required by applicable law or agreed to in writing, software        *
13 # distributed under the License is distributed on an "AS IS" BASIS,          *
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   *
15 # See the License for the specific language governing permissions and        *
16 # limitations under the License.                                             *
17 #                                                                            *
18 ******************************************************************************/
19
20 #include <stdio.h>
21 #include <unistd.h>
22 #include <string>
23 #include <iostream>
24 #include <vector>
25 #include <thread>
26 #include "./wg_defs.h"
27 #include "e2sim_sctp.hpp"
28 #include "e2ap_message_handler.hpp"
29
30 extern "C" {
31 #include "E2AP-PDU.h"
32 #include "e2ap_asn1c_codec.h"
33 #include "ProtocolIE-Field.h"
34 }
35
36
37 using namespace std;
38
39 int keep_looping;
40 struct timespec* start_ts = (struct timespec*)malloc(sizeof(struct timespec));
41 vector<struct timespec*> msg_ts = vector<struct timespec*>(NUM_SAMPLE, NULL);
42 vector<float> latency_v = vector<float>(NUM_SAMPLE, 0.0);
43
44 int subresponse_get_sequenceNum(E2AP_PDU_t* pdu) {
45   SuccessfulOutcome_t* responseMsg = pdu->choice.successfulOutcome;
46   RICrequestID_t* requestid;
47   int num_IEs = responseMsg->value.choice.RICsubscriptionResponse.protocolIEs.\
48       list.count;
49
50   for (int edx = 0; edx < num_IEs; edx++) {
51     RICsubscriptionResponse_IEs_t* memb_ptr =
52         responseMsg->value.choice.RICsubscriptionResponse.protocolIEs.list.\
53         array[edx];
54     switch (memb_ptr->id) {
55       case (ProtocolIE_ID_id_RICrequestID):
56         requestid = &memb_ptr->value.choice.RICrequestID;
57         return requestid->ricRequestSequenceNumber;
58         break;
59     }
60   }
61 }
62
63 void subrequest_set_sequenceNum(E2AP_PDU_t* pdu, int seq) {
64   InitiatingMessage_t* initiatingMessage = pdu->choice.initiatingMessage;
65   RICrequestID_t* requestid;
66   int num_IEs = initiatingMessage->value.choice.RICsubscriptionRequest.\
67       protocolIEs.list.count;
68   for (int edx = 0; edx < num_IEs; edx++) {
69     RICsubscriptionRequest_IEs_t* memb_ptr = initiatingMessage->value.choice.\
70         RICsubscriptionRequest.protocolIEs.list.array[edx];
71     switch (memb_ptr->id) {
72       case (ProtocolIE_ID_id_RICrequestID):
73         requestid = &memb_ptr->value.choice.RICrequestID;
74         requestid->ricRequestSequenceNumber = seq;
75         break;
76     }
77   }
78 }
79
80 int wg_setup(int socket_fd) {
81   sctp_buffer_t recv_buf;
82   // stage 1: Receive ENDC_X2_Setup Request; Send ENDC_X2_Setup Response
83   while (sctp_receive_data(socket_fd, recv_buf) <= 0)
84     continue;
85   // decode the data into E2AP-PDU
86   E2AP_PDU_t* pdu = new E2AP_PDU_t();
87   e2ap_asn1c_decode_pdu(pdu, recv_buf.buffer, recv_buf.len);
88   int procedureCode = e2ap_asn1c_get_procedureCode(pdu);
89   int index = static_cast<int>(pdu->present);
90   e2ap_handle_ENDCX2SetupRequest(pdu, socket_fd);
91   // stage 2: send a response wait for request,if not keep sending
92   while (1) {
93     E2AP_PDU_t* res_pdu = e2ap_xml_to_pdu("E2AP_RICsubscriptionResponse.xml");
94     uint8_t* buf;
95     sctp_buffer_t data;
96     data.len = e2ap_asn1c_encode_pdu(res_pdu, &buf);
97     memcpy(data.buffer, buf, data.len);
98     if (sctp_send_data(socket_fd, data) > 0) {
99       LOG_I("[WG] Sent RIC-SUBSCRIPTION-RESPONSE");
100     } else {
101       LOG_E("[WG] Unable to send RIC-SUBSCRIPTION-RESPONSE to peer");
102     }
103     while (sctp_receive_data(socket_fd, recv_buf) <= 0)
104       continue;
105     e2ap_asn1c_decode_pdu(pdu, recv_buf.buffer, recv_buf.len);
106     // e2ap_asn1c_print_pdu(pdu);
107     procedureCode = e2ap_asn1c_get_procedureCode(pdu);
108     index = static_cast<int>(pdu->present);
109     if (procedureCode == ProcedureCode_id_ricSubscription && \
110         index == E2AP_PDU_PR_initiatingMessage) {
111       LOG_I("[WG] Received RIC-SUBSCRIPTION-REQUEST");
112       return 0;
113     }
114   }
115   return -1;
116 }
117
118 int wg_generator(int client_fd, int lapse) {
119   int count = 0;
120   while (1) {
121     E2AP_PDU_t* res_pdu = e2ap_xml_to_pdu("E2AP_RICsubscriptionResponse.xml");
122     subrequest_set_sequenceNum(res_pdu, count);
123     uint8_t* buf;
124     sctp_buffer_t data;
125     data.len = e2ap_asn1c_encode_pdu(res_pdu, &buf);
126     memcpy(data.buffer, buf, data.len);
127     // send response data over sctp
128     usleep(lapse);
129     if (sctp_send_data(client_fd, data) > 0) {
130       int index = count % keep_looping;
131       struct timespec* ts_p = (struct timespec*)malloc(sizeof(struct timespec));
132       clock_gettime(CLOCK_REALTIME, ts_p);
133       msg_ts[index] = ts_p;
134     } else {
135       LOG_E("[WG] Unable to send RIC-SUBSCRIPTION-RESPONSE to peer");
136       return -1;
137     }
138     count++;
139   }
140   return 0;
141 }
142
143 int wg_receiver(int client_fd) {
144   int count_msg_recved = 0;
145   clock_gettime(CLOCK_REALTIME, start_ts);
146   while (1) {
147     sctp_buffer_t recv_buf;
148     // stage 1: Receive ENDC_X2_Setup Request; Send ENDC_X2_Setup Response
149     while (sctp_receive_data(client_fd, recv_buf) <= 0)
150       continue;
151     count_msg_recved++;
152     // decode the data into E2AP-PDU
153     E2AP_PDU_t* pdu = new E2AP_PDU_t();
154     e2ap_asn1c_decode_pdu(pdu, recv_buf.buffer, recv_buf.len);
155     int seq_n = subresponse_get_sequenceNum(pdu);
156     int index = seq_n % keep_looping;
157     if (index == 0) {
158       char fname[256];
159       snprintf(fname, sizeof fname, "cdf.csv");
160       FILE *fptr = fopen(fname, "w");
161       for (int i = 0; i < latency_v.size(); i++)
162         fprintf(fptr, "%d %f\n", i, latency_v[i]);
163       fclose(fptr);
164     }
165     if (msg_ts[index] != NULL && index < msg_ts.size()) {
166       struct timespec* ts = (struct timespec*)malloc(sizeof(struct timespec));
167       clock_gettime(CLOCK_REALTIME, ts);
168       uint64_t elapse = (ts->tv_sec - msg_ts[index]->tv_sec) * 1000000000 + \
169           ts->tv_nsec - msg_ts[index]->tv_nsec;
170       double ms_elapse = static_cast<double>(elapse) / 1000000;
171       uint64_t total_elapse = (ts->tv_sec - start_ts->tv_sec) * 1000000000 + \
172           ts->tv_nsec - start_ts->tv_nsec;
173       double total_ms_elapse = static_cast<double>(total_elapse / 1000000);
174       latency_v[index] = ms_elapse;
175       cout << '\r' << ms_elapse << "ms " << count_msg_recved * 1000.0 / \
176                       total_ms_elapse << "msgs/second" << flush;
177     }
178   }
179 }
180
181 int main(int argc, char* argv[]) {
182   LOG_I("Start WG");
183   wg_options_t ops = wg_input_options(argc, argv);
184   int server_fd = sctp_start_server(ops.server_ip, ops.server_port);
185   int client_fd = sctp_accept_connection(ops.server_ip, server_fd);
186   keep_looping = NUM_SAMPLE;
187   LOG_I("[SCTP] Waiting for SCTP data");
188   uint64_t count = 0;
189   int lapse = static_cast<int>(((1.0/static_cast<double>(ops.rate)) * SEC2MUS));
190   wg_setup(client_fd);
191   thread generator = thread(wg_generator, client_fd, lapse);
192   thread receiver = thread(wg_receiver, client_fd);
193   generator.join();
194   receiver.join();
195   return 0;
196 }