#include <thread>
+// globally list gnodeb-id we use
+std::string gNodeBID = "abc123";
+
// global queue for testing
-std::queue<std::string> message_bus;
+// acts like a channel
+std::queue<std::pair<std::string, std::string>> message_bus;
// global lock for testing
std::mutex get_object ;
bool is_running = true;
-bool mock_fail(int mtype, size_t len, void * payload, int mode){
+
+// ==================================================
+// various mock transmission functions that simulate underlying
+// transmission layer behaviour
+
+// this function immediately fails
+// simulates a channel that is not available
+bool mock_fail(int mtype, size_t len, void * payload, std::string gNodeB_id, int mode){
return false;
}
-bool mock_silent(int mtype, size_t len, void * payload, int mode){
+// silently returns without actually doing any transmission
+// simulates a lost transmission
+bool mock_silent(int mtype, size_t len, void * payload, std::string gNodeB_id, int mode){
return true;
}
-
-bool mock_tx(int mytpe, size_t len, void *payload, int mode){
+// simulates a working transmission channel
+bool mock_tx(int mytpe, size_t len, void *payload, std::string gNodeB_id, int mode){
bool res;
int i;
std::lock_guard<std::mutex> guard(get_object);
std::string msg((char *)buffer, buf_size);
//std::cout <<"Pushed to queue" << std::endl;
- message_bus.push(msg);
+ message_bus.push(std::make_pair(gNodeB_id, msg));
}
ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
if(msg_ok)
- return true;
-
+ return true;
else
return false;
-// Randomly generate number of subscription response and delete
-// response packets and push to queue
-void random_tx(int num_packets){
- subscription_response_helper he_resp;
- subscription_response sub_resp;
- subscription_delete sub_del_req;
- subscription_delete_response sub_del_resp;
- bool res;
- unsigned char buffer[256];
- size_t buf_size = 256;
-
- he_resp.add_action(10);
-
- // generate subscription responses
- for(int i = 0; i < num_packets; i++){
-
- // set up response object
- he_resp.set_request(i, 1);
- he_resp.set_function_id(0);
- buf_size = 256;
- res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true);
- {
- std::lock_guard<std::mutex> guard(get_object);
- std::string msg((char *)buffer, buf_size);
- message_bus.push(msg);
- }
-
- buf_size = 256;
- res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false);
- {
- std::lock_guard<std::mutex> guard(get_object);
- std::string msg((char *)buffer, buf_size);
- message_bus.push(msg);
- }
-
- buf_size = 256;
- res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true);
- {
- std::lock_guard<std::mutex> guard(get_object);
- std::string msg((char *)buffer, buf_size);
- message_bus.push(msg);
- }
-
- buf_size = 256;
- res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false);
- {
- std::lock_guard<std::mutex> guard(get_object);
- std::string msg((char *)buffer, buf_size);
- message_bus.push(msg);
- }
-
-
- }
-}
+// simulates response :takes what is in the queue, processes it and then invokes
+// subscription_handler response
+void mock_RAN (subscription_handler &_ref_sub_handler, int delay){
-void mock_RAN (subscription_handler &_ref_sub_handler){
// Behaviour :
-
+ std::string gNodeB_id;
unsigned char incorrect_e2ap[128];
size_t incorrect_e2ap_size = 128;
for(int i = 0; i < 128; i++){
incorrect_e2ap[i] = 'b';
}
-
+
FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r");
if(pfile == NULL){
std::cout << "Error opening e2ap_indication_test.per" << std::endl;
fclose(pfile);
unsigned char message_buf[512];
- std::string pdu;
-
+ std::pair<std::string, std::string> pdu;
+
+ bool is_resp;
while(is_running){
- // send some random data, i.e incorrect E2AP
- _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size);
+
+ // test illegal packet response : send some random data, i.e incorrect E2AP
+ _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size, gNodeBID.c_str());
//std::cout <<"Sent random data to subscription handler" << std::endl;
- // send an E2AP which is not subscription request
- _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size);
+ //test incorrect packet response : send an E2AP which is not subscription request
+ _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size, gNodeBID.c_str());
//std::cout <<"Sent incorrect e2ap to subscription handler" << std::endl;
// now look in the queue, pop it and send the data
- // finally send correct payload
+ // finally send correct payload if queue not empty
+ is_resp = false;
{
std::lock_guard<std::mutex> guard(get_object);
if(! message_bus.empty()){
pdu = message_bus.front();
- memcpy(message_buf, pdu.c_str(), pdu.length());
+ gNodeB_id = pdu.first;
+ memcpy(message_buf, pdu.second.c_str(), pdu.second.length());
message_bus.pop();
+ is_resp =true;
}
}
-
- _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length());
- //std::cout <<"Sent response to subscription handler" << std::endl;
-
-
-
- sleep(1);
+
+ if(is_resp){
+ sleep(delay);
+ _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.second.length(), gNodeB_id.c_str());
+ //std::cout <<"Sent response to subscription handler" << std::endl;
+ }
+
}
}
+// wrapper function that we use to test sending subscriptions with various channels
+void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, std::vector<std::string> & gNodeBs, int index, bool (*tx)(int, size_t, void *, std::string, int), int mode ){
-void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, int index, bool (*tx)(int, size_t, void *, int), int mode ){
subscription_helper subscription_info;
subscription_request sub_req;
subscription_response_helper subscription_response_info;
subscription_info.add_action(action_id, action_type);
subscription_info.set_event_def(&event_buf[0], event_buf_len);
- auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
- res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
+ auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index], mode);
+ res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBs[index], RIC_SUB_REQ, transmitter);
if (res == SUBSCR_SUCCESS ){
- // store -ve of request id
- status_vector[index] = -1 * subscription_info.get_request_id();
+ status_vector[index] = -1 ;
}
else{
status_vector[index] = res;
std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl;
}
-void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, int index, int request_id, bool ( *tx)(int, size_t, void *, int), int mode ){
+
+// wrapper function that we use to test sending delete requests with various channels
+void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, std::vector<std::string> & gNodeBs, int index, int request_id, bool ( *tx)(int, size_t, void *, std::string, int), int mode ){
subscription_helper subscription_info;
subscription_response_helper subscription_response_info;
//verify subscription deleted
- subscription_info.set_request(request_id, 1);
+ subscription_info.set_request(0, 0);
subscription_info.set_function_id(0);
- auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
- status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter);
+ auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index], mode);
+ status_vector[index] = subscription_manager.request_subscription_delete(subscription_info, subscription_response_info, gNodeBs[index], RIC_SUB_DEL_REQ, transmitter);
};
-TEST_CASE("Test subscription work flow", "E2AP Subscription"){
+TEST_CASE("Test various channel responses", "E2AP Subscription"){
subscription_handler subscription_manager;
mdclog_attr_init(&attr);
mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
mdclog_init(attr);
- mdclog_level_set(MDCLOG_DEBUG);
+ mdclog_level_set(MDCLOG_INFO);
mdclog_attr_destroy(attr);
- unsigned char node_buffer[32];
- std::string gNodeB = "TEST_GNOBDE";
-
- std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
- node_buffer[gNodeB.length()] = '\0';
-
-
- //====================================
-
+ //====================================
SECTION("Verify behaviour if no listener "){
std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl;
subscription_manager.clear();
std::vector<std::thread> source_list;
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0));
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_fail), 0));
}
for(int i = 0; i < num_sources; i++){
subscription_manager.clear();
std::vector<std::thread> source_list;
-
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
+
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0));
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_silent), 0));
}
for(int i = 0; i < num_sources; i++){
}
+}
+
+
+TEST_CASE("Test config", "E2AP Subscription"){
+
+ subscription_handler subscription_manager;
+
+ mdclog_attr_t *attr;
+ mdclog_attr_init(&attr);
+ mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
+ mdclog_init(attr);
+ mdclog_level_set(MDCLOG_INFO);
+ mdclog_attr_destroy(attr);
+
SECTION("Verify timeout behaviour if listener does not response"){
std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR " << std::endl;
int timeout_val = 2;
int num_tries = 1;
std::vector<int> status_vector(num_sources, 0);
-
+
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
+
subscription_manager.clear();
subscription_manager.set_timeout(timeout_val);
subscription_manager.set_num_retries(num_tries);
auto start = std::chrono::steady_clock::now();
- send_request(subscription_manager, status_vector, 0, mock_silent, 0);
+ send_request(subscription_manager, status_vector, gNodeBs, 0, mock_silent, 0);
auto end = std::chrono::steady_clock::now();
auto diff = end - start;
status_vector.clear();
start = std::chrono::steady_clock::now();
- send_request(subscription_manager, status_vector, 0, mock_silent, 0);
+ send_request(subscription_manager, status_vector, gNodeBs, 0, mock_silent, 0);
end = std::chrono::steady_clock::now();
diff = end - start;
}
+}
+
+TEST_CASE("Test sunny day scenarios", "E2AP Subscription"){
+
+ subscription_handler subscription_manager;
+
+ mdclog_attr_t *attr;
+ mdclog_attr_init(&attr);
+ mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
+ mdclog_init(attr);
+ mdclog_level_set(MDCLOG_INFO);
+ mdclog_attr_destroy(attr);
+
+ SECTION("Verify subscription request/response success"){
+ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;
+
+ subscription_manager.clear();
+
+ int num_sources = 10;
+ int num_sinks = 5;
+
+ std::vector<std::thread> source_list;
+ std::vector<std::thread> sink_list;
+
+ std::vector<int> status_vector(num_sources, 0);
+
+ // First Test null cases in queries,i.e non-existing request
+ subscription_identifier id = std::make_tuple (gNodeBID, 10);
+ REQUIRE(subscription_manager.is_subscription_entry(id) == false);
+ id = std::make_tuple(gNodeBID, 1);
+ REQUIRE(subscription_manager.is_request_entry(id) == false);
+ REQUIRE(subscription_manager.get_request_status(id) == -1);
+
+ // start up the sinks
+ is_running = true;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+ }
+
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
+
+ for(int i = 0; i < num_sources; i++){
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
+ }
+
+
+ for(int i = 0; i < num_sources; i++){
+ source_list[i].join();
+ REQUIRE(status_vector[i] < 0);
+ id = std::make_tuple(gNodeBs[i], 0);
+ REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+ }
+
+ // stop the sinks
+ is_running =false;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list[i].join();
+ }
+
+ REQUIRE(subscription_manager.num_complete() == num_sources);
+ REQUIRE(subscription_manager.num_pending() == 0);
+
+ // test getting subscription :
+ // case 1: fake request
+ id = std::make_tuple(gNodeBID, 0);
+ const subscription_response_helper * sub_info = subscription_manager.get_subscription(id);
+ REQUIRE(sub_info == NULL);
+
+ // case 2: valid request : get all the keys and use them
+ std::vector<subscription_identifier> key_list;
+ subscription_manager.get_subscription_keys(key_list);
+ REQUIRE(key_list.size() == subscription_manager.num_complete());
+ for(auto &e: key_list){
+ sub_info = subscription_manager.get_subscription(e);
+ REQUIRE(sub_info != NULL);
+ }
+ }
+
+
+
+ SECTION("Delete requests that have succeeeded"){
+ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
+
+ subscription_manager.clear();
+
+ int num_sources = 10;
+ int num_sinks = 5;
+ const subscription_response_helper * sub_resp_info;
+
+ std::vector<std::thread> source_list;
+ std::vector<std::thread> sink_list;
+
+ std::vector<int> status_vector(num_sources, 0);
+ std::vector<std::string> gNodeBs;
+
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
+
+ // start up the sinks
+ is_running = true;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+ }
+
+ // First do subscriptions ...
+ for(int i = 0; i < num_sources; i++){
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
+ }
+
+
+ for(int i = 0; i < num_sources; i++){
+ source_list[i].join();
+ REQUIRE(status_vector[i] < 0 );
+ subscription_identifier id = std::make_tuple (gNodeBs[i], 0);
+ REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+ sub_resp_info = subscription_manager.get_subscription(id);
+ REQUIRE(sub_resp_info != NULL);
+ REQUIRE(sub_resp_info->get_request_id() == 0);
+
+ }
+
+ REQUIRE(subscription_manager.num_complete() == num_sources);
+ REQUIRE(subscription_manager.num_pending() == 0);
+
+
+ // Store ids ..
+ std::vector<int> completed_requests = status_vector;
+
+ // Delete successes
+ source_list.clear();
+ for(int i = 0; i < num_sources; i++){
+ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i], std::ref(mock_tx), 0));
+ }
+
+
+ for(int i = 0; i < num_sources; i++){
+ source_list[i].join();
+ REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
+ }
+
+ REQUIRE(subscription_manager.num_pending() == 0);
+
+
+ // stop the sinks
+ is_running =false;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list[i].join();
+ }
+
+ REQUIRE(subscription_manager.num_complete() == 0);
+ REQUIRE(subscription_manager.num_pending() == 0);
+
+ }
+
+
+}
+
+TEST_CASE("Test rainy day scenarios", "E2AP Subscription"){
+
+ subscription_handler subscription_manager;
+
+ mdclog_attr_t *attr;
+ mdclog_attr_init(&attr);
+ mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
+ mdclog_init(attr);
+ mdclog_level_set(MDCLOG_INFO);
+ mdclog_attr_destroy(attr);
+
SECTION("Verify rejection of illegal pdus"){
+
std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl;
subscription_helper subscription_info;
subscription_response_helper subscription_response_info;
int request_id = 1;
int req_seq = 1;
- int message_type = 1;
- int procedure_code = 27;
unsigned char event_buf[] = "Hello world";
size_t event_buf_len = strlen((const char *)event_buf);
subscription_info.add_action(action_id, action_type);
subscription_info.set_event_def(&event_buf[0], event_buf_len);
-
- auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0);
- res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
+ std::vector<std::string> gNodeBs;
+ auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBID, 0);
+ res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBID, RIC_SUB_REQ, transmitter);
REQUIRE(res == SUBSCR_ERR_ENCODE);
-
-
}
SECTION("Verify subscription request/response fail"){
std::vector<std::thread> sink_list;
std::vector<int> status_vector(num_sources, 0);
+
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
// start up the sinks
is_running = true;
for(int i = 0; i < num_sinks; i++){
- sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
}
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1));
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), -1));
}
}
- SECTION("Verify subscription request/response success"){
- std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;
+ SECTION("Delete requests for non-existent subscription requests"){
+ std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
-
- subscription_manager.clear();
+ subscription_manager.clear();
+ subscription_identifier id = std::make_tuple (gNodeBID, 0);
+ REQUIRE(subscription_manager.get_request_status(id) == -1);
+ REQUIRE(subscription_manager.is_subscription_entry(id) == false);
+ REQUIRE(subscription_manager.is_request_entry(id) == false);
int num_sources = 10;
- int num_sinks = 5;
-
- std::vector<std::thread> source_list;
- std::vector<std::thread> sink_list;
+ std::vector<std::thread> source_list;
std::vector<int> status_vector(num_sources, 0);
+ srand(100);
+ std::vector<std::string> gNodeBs;
- // Test null cases in queries
- REQUIRE(subscription_manager.is_subscription_entry(10) == false);
- REQUIRE(subscription_manager.is_request_entry(1) == false);
- REQUIRE(subscription_manager.get_request_status(1) == -1);
-
- // start up the sinks
- is_running = true;
- for(int i = 0; i < num_sinks; i++){
- sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
- }
-
- for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
- }
-
-
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
for(int i = 0; i < num_sources; i++){
- source_list[i].join();
- REQUIRE(status_vector[i] < 0);
- REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
- }
-
- // stop the sinks
- is_running =false;
- for(int i = 0; i < num_sinks; i++){
- sink_list[i].join();
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
}
-
- REQUIRE(subscription_manager.num_complete() == num_sources);
- REQUIRE(subscription_manager.num_pending() == 0);
-
- const subscription_response_helper * sub_info = subscription_manager.get_subscription(-1);
- REQUIRE(sub_info == NULL);
-
- sub_info = subscription_manager.get_subscription(-1 * status_vector[0]);
- REQUIRE(sub_info != NULL);
- REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]);
-
- }
- SECTION("Delete requests for non-existent subscription requests"){
- std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
-
- subscription_manager.clear();
- REQUIRE(subscription_manager.get_request_status(0) == -1);
- REQUIRE(subscription_manager.is_subscription_entry(0) == false);
- REQUIRE(subscription_manager.is_request_entry(0) == false);
-
- int num_sources = 10;
-
- std::vector<std::thread> source_list;
- std::vector<int> status_vector(num_sources, 0);
- srand(100);
for(int i = 0; i < num_sources; i++){
int req_id = rand()%1000;
- source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id , std::ref(mock_tx), 0));
+ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i,req_id , std::ref(mock_tx), 0));
}
REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING);
}
-
}
-
-
- SECTION("Delete requests that have succeeeded"){
- std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
-
- subscription_manager.clear();
-
- int num_sources = 10;
- int num_sinks = 5;
- const subscription_response_helper * sub_resp_info;
-
- std::vector<std::thread> source_list;
- std::vector<std::thread> sink_list;
-
- std::vector<int> status_vector(num_sources, 0);
-
- // start up the sinks
- is_running = true;
- for(int i = 0; i < num_sinks; i++){
- sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
- }
-
- // First do subscriptions ...
- for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
- }
-
-
- for(int i = 0; i < num_sources; i++){
- source_list[i].join();
- REQUIRE(status_vector[i] < 0 );
- REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
- sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
- REQUIRE(sub_resp_info != NULL);
- REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
-
- }
-
- REQUIRE(subscription_manager.num_complete() == num_sources);
- REQUIRE(subscription_manager.num_pending() == 0);
-
-
- // Store ids ..
- std::vector<int> completed_requests = status_vector;
-
- // Delete successes
- source_list.clear();
- for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 0));
- }
-
-
- for(int i = 0; i < num_sources; i++){
- source_list[i].join();
- REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
- }
-
- REQUIRE(subscription_manager.num_pending() == 0);
-
-
- // stop the sinks
- is_running =false;
- for(int i = 0; i < num_sinks; i++){
- sink_list[i].join();
- }
- REQUIRE(subscription_manager.num_complete() == 0);
- REQUIRE(subscription_manager.num_pending() == 0);
-
- }
+
SECTION("Deletes that fail"){
std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl;
// start up the sinks
is_running = true;
for(int i = 0; i < num_sinks; i++){
- sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+ }
+
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
}
// First do subscriptions ...
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
}
for(int i = 0; i < num_sources; i++){
source_list[i].join();
REQUIRE(status_vector[i] < 0 );
- REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
- sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
+ subscription_identifier id = std::make_tuple(gNodeBs[i], 0);
+ REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+ sub_resp_info = subscription_manager.get_subscription(id);
REQUIRE(sub_resp_info != NULL);
- REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
+ REQUIRE(sub_resp_info->get_request_id() == 0);
}
REQUIRE(subscription_manager.num_pending() == 0);
- // Store ids ..
+ // Store status results
std::vector<int> completed_requests = status_vector;
- // Delete failures
+ // Delete failures : mock_tx set to respond with failure
source_list.clear();
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 1));
+ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i], std::ref(mock_tx), 1));
}
for(int i = 0; i < num_sources; i++){
sink_list[i].join();
}
- // subscriptions are still there
+ // subscriptions are still there (did not get deleted)
REQUIRE(subscription_manager.num_complete() == num_sources);
REQUIRE(subscription_manager.num_pending() == 0);
std::vector<std::thread> sink_list;
std::vector<int> status_vector(num_sources, 0);
+ std::vector<std::string> gNodeBs;
+
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
// start up the sinks
is_running = true;
for(int i = 0; i < num_sinks; i++){
- sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
}
// First do subscriptions ...
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
}
for(int i = 0; i < num_sources; i++){
source_list[i].join();
REQUIRE(status_vector[i] < 0 );
- REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
- sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
+ subscription_identifier id = std::make_tuple(gNodeBs[i], 0);
+ REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+ sub_resp_info = subscription_manager.get_subscription(id);
REQUIRE(sub_resp_info != NULL);
- REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
+ REQUIRE(sub_resp_info->get_request_id() == 0);
}
// Delete with time-outs
source_list.clear();
for(int i = 0; i < num_sources; i++){
- source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_silent), 0));
+ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i], std::ref(mock_silent), 0));
}
}
- SECTION("Spurious messages"){
- std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl;
+
- // In this section, we basically inject
- // spurious messages to subscription handler.
- // There are no outcomes. basically
- // handler should be able to ignore these messages
+ SECTION("Verify timeout behaviour if transmitter sends after delay"){
+ std::cout <<"+++++++++" << std::endl << "TEST DELAYED ARRIVAL OF SUBSCRIPTIONS " << std::endl;
+
+ subscription_manager.clear();
+ int num_sources = 10;
+ int num_sinks = 5;
+
+ std::vector<std::thread> source_list;
+ std::vector<std::thread> sink_list;
+
+ std::vector<int> status_vector(num_sources, 0);
- int num_packets = 50;
- int num_sinks = 10;
- std::vector<std::thread> sink_list;
+ // set subscription manager timeout on short fuse
+ int time_out = 1;
+ int num_tries = 1;
+ subscription_manager.set_timeout(time_out);
+ subscription_manager.set_num_retries(num_tries);
+
+ // start up the sinks with delayed response
+ is_running = true;
+ int delay = 5;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), delay));
+ }
+
+
+ // generate the gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+ }
+
+ for(int i = 0; i < num_sources; i++){
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
+ }
+
+
+ for(int i = 0; i < num_sources; i++){
+ source_list[i].join();
+ REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
+ }
+
+ // stop the sinks
+ is_running =false;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list[i].join();
+ }
+
+ REQUIRE(subscription_manager.num_complete() == 0);
+ REQUIRE(subscription_manager.num_pending() == 0);
+
+ }
+
+ SECTION("Duplicate requests"){
+ std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION REQUESTS " << std::endl;
subscription_manager.clear();
- std::cout <<"Message queue size prior to fill = " << message_bus.size() << std::endl;
- random_tx(num_packets);
- std::cout <<"Message queue size post fill = " << message_bus.size() << std::endl;
-
+
+ int num_sources = 20;
+ int num_sinks = 5;
+
+ std::vector<std::thread> source_list;
+ std::vector<std::thread> sink_list;
+
+ std::vector<int> status_vector(num_sources, 0);
+
+ // generate IDENTICAL gnodeb list for which we are subscribing
+ // default ran_function_id is zero
+ std::vector<std::string> gNodeBs;
+ for(int i = 0; i < num_sources; i++){
+ gNodeBs.push_back("test-gnodeb-" + std::to_string(0));
+ }
// start up the sinks
is_running = true;
for(int i = 0; i < num_sinks; i++){
- sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+ }
+
+ // send out subscriptions
+ for(int i = 0; i < num_sources; i++){
+ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
}
- // wait for queue to drain out
- while(! message_bus.empty()){
- sleep(2);
+ // exactly ONE subscription should succeed. all others should fail with SUBSCR_ERR_DUPLICATE
+ for(int i = 0; i < num_sources; i++){
+ source_list[i].join();
+ REQUIRE( (status_vector[i] == -1 || status_vector[i] == SUBSCR_ERR_DUPLICATE));
+
}
// stop the sinks
for(int i = 0; i < num_sinks; i++){
sink_list[i].join();
}
- REQUIRE(subscription_manager.num_complete() == 0);
+
+ REQUIRE(subscription_manager.num_complete() == 1);
+ }
+
+
+ SECTION("Duplicate responses"){
+ // this scenario can happen if there was an initial successful
+ // subscription with <gnodeb-id, ran-function-id> request
+ // followed by another one. The response for the second one should
+ // result in a duplicate subscription error
+
+
+ std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION RESPONSES" << std::endl;
+
+ subscription_manager.clear();
+
+ int num_sources = 1;
+ int num_sinks = 1;
+ std::vector<int> status_vector (num_sources, 0);
+ std::vector<std::string> gNodeBs;
+ gNodeBs.push_back("test-gnodeb");
+
+ std::vector<std::thread> sink_list;
+ // start up the sinks
+ is_running = true;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+ }
+
+ // send a subscription : this should succeed
+ send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0);
+ REQUIRE(status_vector[0] == -1);
REQUIRE(subscription_manager.num_pending() == 0);
+ REQUIRE(subscription_manager.num_complete() == 1);
- std::cout <<"Message queue size at end = " << message_bus.size() << std::endl;
+ // now send same subscription again
+ send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0);
+ REQUIRE(status_vector[0] == SUBSCR_ERR_DUPLICATE);
+ REQUIRE(subscription_manager.num_pending() == 0);
+ REQUIRE(subscription_manager.num_complete() == 1);
+
+ // stop the sinks
+ is_running =false;
+ for(int i = 0; i < num_sinks; i++){
+ sink_list[i].join();
+ }
+
}
-
-};
+
+}