X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Funit_test_subscription_flow.cc;fp=test%2Funit_test_subscription_flow.cc;h=9c99f7777b215dba31210b5dc79967834715d57e;hb=dc68642f5676cdd49f3ef92f983b319e21f16afa;hp=0000000000000000000000000000000000000000;hpb=0054ece5d9d4bcb28ecda2f0f36584f6a64fc869;p=ric-app%2Fadmin.git diff --git a/test/unit_test_subscription_flow.cc b/test/unit_test_subscription_flow.cc new file mode 100644 index 0000000..9c99f77 --- /dev/null +++ b/test/unit_test_subscription_flow.cc @@ -0,0 +1,877 @@ +/*================================================================================== + + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* Author : Ashwin Sridharan + Date : Feb 2019 +*/ + + +/* + Unit testing of subscription handler +*/ + +#define CATCH_CONFIG_MAIN +#include + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +// global queue for testing +std::queue message_bus; + +// global lock for testing +std::mutex get_object ; + +bool is_running = true; + +bool mock_fail(int mtype, size_t len, void * payload, int mode){ + return false; +} + +bool mock_silent(int mtype, size_t len, void * payload, int mode){ + return true; +} + + +bool mock_tx(int mytpe, size_t len, void *payload, int mode){ + + bool res; + int i; + subscription_helper he; + subscription_response_helper he_resp; + + subscription_request sub_req; + subscription_response sub_resp; + + subscription_delete sub_del_req; + subscription_delete_response sub_del_resp; + asn_dec_rval_t retval; + + E2N_E2AP_PDU_t * e2ap_pdu_recv; + unsigned char buffer[256]; + size_t buf_size = 256; + bool msg_ok = false; + + e2ap_pdu_recv = 0; + retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_pdu_recv), payload, len); + if(retval.code != RC_OK){ + std::cerr <<"Error decoding E2N_E2AP Subscription response PDU. Reason = " << strerror(errno) << std::endl; + ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv); + return false; + } + + int procedure_code = e2ap_pdu_recv->choice.initiatingMessage->procedureCode; + + if(procedure_code == E2N_ProcedureCode_id_ricSubscription){ + + he.clear(); + sub_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he); + + + // set up response object + he_resp.set_request(he.get_request_id(), he.get_req_seq()); + he_resp.set_function_id(he.get_function_id()); + i = 0; + + // we simply copy over actions to both admitted and not + // admitted list for now .. + // in future, may need to be more selective + for(auto &e : *(he.get_list())){ + he_resp.add_action(e.get_id()); + he_resp.add_action(e.get_id(), 1, 2); + i++; + } + + if(mode == 0){ + res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true); + if (!res){ + std::cerr << "Error encoding subscription response successful. Reason = " << sub_resp.get_error() << std::endl; + } + else{ + msg_ok = true; + } + } + else{ + res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false); + if (!res){ + std::cerr << "Error encoding subscription response failure . Reason = " << sub_resp.get_error() << std::endl; + } + else{ + msg_ok = true; + } + + }; + + } + + else if (procedure_code == E2N_ProcedureCode_id_ricSubscriptionDelete){ + + he.clear(); + sub_del_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he); + + // set up response object + he_resp.clear(); + he_resp.set_request(he.get_request_id(), he.get_req_seq()); + he_resp.set_function_id(he.get_function_id()); + if(mode == 0){ + res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true); + if (!res){ + std::cerr << "Error encoding subscription delete response failure . Reason = " << sub_resp.get_error() << std::endl; + } + else{ + msg_ok = true; + } + } + else{ + res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false); + if (!res){ + std::cerr << "Error encoding subscription delete response failure . Reason = " << sub_resp.get_error() << std::endl; + } + else{ + msg_ok = true; + std::cout <<"Sending delete failures ..." << std::endl; + } + + } + } + else{ + std::cout <<"Illegal request" << std::endl; + } + + + // push to queue + if(msg_ok){ + std::lock_guard guard(get_object); + std::string msg((char *)buffer, buf_size); + //std::cout <<"Pushed to queue" << std::endl; + message_bus.push(msg); + } + + + ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv); + if(msg_ok) + return true; + + else + return false; + + +} + + + +// Randomly generate number of subscription response and delete +// response packets and push to queue +void random_tx(int num_packets){ + subscription_response_helper he_resp; + subscription_response sub_resp; + subscription_delete sub_del_req; + subscription_delete_response sub_del_resp; + bool res; + unsigned char buffer[256]; + size_t buf_size = 256; + + he_resp.add_action(10); + + // generate subscription responses + for(int i = 0; i < num_packets; i++){ + + // set up response object + he_resp.set_request(i, 1); + he_resp.set_function_id(0); + buf_size = 256; + res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true); + { + std::lock_guard guard(get_object); + std::string msg((char *)buffer, buf_size); + message_bus.push(msg); + } + + buf_size = 256; + res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false); + { + std::lock_guard guard(get_object); + std::string msg((char *)buffer, buf_size); + message_bus.push(msg); + } + + buf_size = 256; + res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true); + { + std::lock_guard guard(get_object); + std::string msg((char *)buffer, buf_size); + message_bus.push(msg); + } + + buf_size = 256; + res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false); + { + std::lock_guard guard(get_object); + std::string msg((char *)buffer, buf_size); + message_bus.push(msg); + } + + + } +} + + +void mock_RAN (subscription_handler &_ref_sub_handler){ + // Behaviour : + + unsigned char incorrect_e2ap[128]; + size_t incorrect_e2ap_size = 128; + for(int i = 0; i < 128; i++){ + incorrect_e2ap[i] = 'b'; + } + + FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r"); + if(pfile == NULL){ + std::cout << "Error opening e2ap_indication_test.per" << std::endl; + exit(-1); + } + unsigned char e2ap_msg[512]; + size_t e2ap_msg_size = fread((char *)e2ap_msg, sizeof(char), 512, pfile); + fclose(pfile); + + unsigned char message_buf[512]; + std::string pdu; + + while(is_running){ + // send some random data, i.e incorrect E2AP + _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size); + //std::cout <<"Sent random data to subscription handler" << std::endl; + + // send an E2AP which is not subscription request + _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size); + //std::cout <<"Sent incorrect e2ap to subscription handler" << std::endl; + + // now look in the queue, pop it and send the data + // finally send correct payload + { + std::lock_guard guard(get_object); + if(! message_bus.empty()){ + pdu = message_bus.front(); + memcpy(message_buf, pdu.c_str(), pdu.length()); + message_bus.pop(); + } + } + + _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length()); + //std::cout <<"Sent response to subscription handler" << std::endl; + + + + sleep(1); + } + +} + + +void send_request(subscription_handler &subscription_manager, std::vector & status_vector, int index, bool (*tx)(int, size_t, void *, int), int mode ){ + subscription_helper subscription_info; + subscription_request sub_req; + subscription_response_helper subscription_response_info; + + int function_id = 0; + int action_id = 4; + int action_type = 0; + + int request_id = 1; + int req_seq = 1; + int message_type = 1; + int procedure_code = 27; + std::string egnb_id = "Testgnb"; + std::string plmn_id = "Testplmn"; + + unsigned char event_buf[128]; + size_t event_buf_len = 128; + int res; + + + e2sm_event_trigger_helper trigger_data; + e2sm_event_trigger event_trigger; + + trigger_data.egNB_id = egnb_id; + trigger_data.plmn_id = plmn_id; + trigger_data.egNB_id_type = 2; + trigger_data.interface_direction = 1; + trigger_data.procedure_code = procedure_code; + trigger_data.message_type = message_type; + + res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data); + + subscription_info.clear(); + subscription_info.set_request(request_id, req_seq); + subscription_info.set_function_id(function_id); + subscription_info.add_action(action_id, action_type); + subscription_info.set_event_def(&event_buf[0], event_buf_len); + + auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode); + res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter); + + if (res == SUBSCR_SUCCESS ){ + // store -ve of request id + status_vector[index] = -1 * subscription_info.get_request_id(); + } + else{ + status_vector[index] = res; + } + + std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl; +} + +void delete_request(subscription_handler &subscription_manager, std::vector & status_vector, int index, int request_id, bool ( *tx)(int, size_t, void *, int), int mode ){ + + subscription_helper subscription_info; + subscription_response_helper subscription_response_info; + + + //verify subscription deleted + subscription_info.set_request(request_id, 1); + subscription_info.set_function_id(0); + + auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode); + status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter); + + +}; + + +TEST_CASE("Test subscription work flow", "E2AP Subscription"){ + + subscription_handler subscription_manager; + + mdclog_attr_t *attr; + mdclog_attr_init(&attr); + mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW "); + mdclog_init(attr); + mdclog_level_set(MDCLOG_DEBUG); + mdclog_attr_destroy(attr); + + unsigned char node_buffer[32]; + std::string gNodeB = "TEST_GNOBDE"; + + std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); + node_buffer[gNodeB.length()] = '\0'; + + + //==================================== + + SECTION("Verify behaviour if no listener "){ + std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl; + + int num_sources = 10; + std::vector status_vector(num_sources, 0); + subscription_manager.clear(); + + std::vector source_list; + + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0)); + } + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_ERR_TX); + } + + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + } + + SECTION("Verify behaviour if listener does not respond"){ + std::cout <<"+++++++++" << std::endl << "TEST WHEN LISTENER DOES NOT RESPOND " << std::endl; + + int num_sources = 10; + std::vector status_vector(num_sources, 0); + + subscription_manager.clear(); + + std::vector source_list; + + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0)); + } + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + } + + for(int i = 0; i < num_sources; i++){ + REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT); + } + + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + + } + + SECTION("Verify timeout behaviour if listener does not response"){ + std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR " << std::endl; + + int res; + int num_sources = 1; + int timeout_val = 2; + int num_tries = 1; + std::vector status_vector(num_sources, 0); + + subscription_manager.clear(); + subscription_manager.set_timeout(timeout_val); + subscription_manager.set_num_retries(num_tries); + + auto start = std::chrono::steady_clock::now(); + send_request(subscription_manager, status_vector, 0, mock_silent, 0); + auto end = std::chrono::steady_clock::now(); + + auto diff = end - start; + + REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT); + REQUIRE(diff.count() >= num_tries * timeout_val); + + num_tries = 2; + subscription_manager.set_num_retries(num_tries); + status_vector.clear(); + + start = std::chrono::steady_clock::now(); + send_request(subscription_manager, status_vector, 0, mock_silent, 0); + end = std::chrono::steady_clock::now(); + + diff = end - start; + + REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT); + REQUIRE(diff.count() >= num_tries * timeout_val); + + } + + SECTION("Verify rejection of illegal pdus"){ + std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl; + subscription_helper subscription_info; + subscription_response_helper subscription_response_info; + + subscription_manager.clear(); + int res; + int function_id = 60000; + int action_id = 4; + int action_type = 0; + + int request_id = 1; + int req_seq = 1; + int message_type = 1; + int procedure_code = 27; + + unsigned char event_buf[] = "Hello world"; + size_t event_buf_len = strlen((const char *)event_buf); + + subscription_info.clear(); + subscription_info.set_request(request_id, req_seq); + subscription_info.set_function_id(function_id); + subscription_info.add_action(action_id, action_type); + subscription_info.set_event_def(&event_buf[0], event_buf_len); + + + auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0); + res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter); + REQUIRE(res == SUBSCR_ERR_ENCODE); + + + + } + + SECTION("Verify subscription request/response fail"){ + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION FAILURE " << std::endl; + + subscription_manager.clear(); + + int num_sources = 20; + int num_sinks = 5; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + } + + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL); + } + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + + } + + SECTION("Verify subscription request/response success"){ + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl; + + + subscription_manager.clear(); + + int num_sources = 10; + int num_sinks = 5; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + + // Test null cases in queries + REQUIRE(subscription_manager.is_subscription_entry(10) == false); + REQUIRE(subscription_manager.is_request_entry(1) == false); + REQUIRE(subscription_manager.get_request_status(1) == -1); + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + } + + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] < 0); + REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); + } + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + const subscription_response_helper * sub_info = subscription_manager.get_subscription(-1); + REQUIRE(sub_info == NULL); + + sub_info = subscription_manager.get_subscription(-1 * status_vector[0]); + REQUIRE(sub_info != NULL); + REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]); + + } + + SECTION("Delete requests for non-existent subscription requests"){ + std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl; + + subscription_manager.clear(); + REQUIRE(subscription_manager.get_request_status(0) == -1); + REQUIRE(subscription_manager.is_subscription_entry(0) == false); + REQUIRE(subscription_manager.is_request_entry(0) == false); + + int num_sources = 10; + + std::vector source_list; + std::vector status_vector(num_sources, 0); + srand(100); + for(int i = 0; i < num_sources; i++){ + int req_id = rand()%1000; + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id , std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING); + } + + + } + + + + SECTION("Delete requests that have succeeeded"){ + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl; + + subscription_manager.clear(); + + int num_sources = 10; + int num_sinks = 5; + const subscription_response_helper * sub_resp_info; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + } + + // First do subscriptions ... + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] < 0 ); + REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); + sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); + REQUIRE(sub_resp_info != NULL); + REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); + + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + + // Store ids .. + std::vector completed_requests = status_vector; + + // Delete successes + source_list.clear(); + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_SUCCESS); + } + + REQUIRE(subscription_manager.num_pending() == 0); + + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + + } + + SECTION("Deletes that fail"){ + + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl; + + subscription_manager.clear(); + + int num_sources = 10; + int num_sinks = 5; + const subscription_response_helper * sub_resp_info; + + std::vector source_list; + std::vector sink_list; + std::vector status_vector(num_sources, 0); + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + } + + // First do subscriptions ... + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] < 0 ); + REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); + sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); + REQUIRE(sub_resp_info != NULL); + REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); + + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + + // Store ids .. + std::vector completed_requests = status_vector; + + // Delete failures + source_list.clear(); + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 1)); + } + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL); + } + + + // stop the sinks + is_running = false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + // subscriptions are still there + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + + } + + SECTION("Deletes that timed out "){ + + std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT TIMEOUT " << std::endl; + + subscription_manager.clear(); + + int num_sources = 10; + int num_sinks = 5; + const subscription_response_helper * sub_resp_info; + + std::vector source_list; + std::vector sink_list; + + std::vector status_vector(num_sources, 0); + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + } + + // First do subscriptions ... + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] < 0 ); + REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); + sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); + REQUIRE(sub_resp_info != NULL); + REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); + + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + + // Store ids .. + std::vector completed_requests = status_vector; + + // Delete with time-outs + source_list.clear(); + for(int i = 0; i < num_sources; i++){ + source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_silent), 0)); + } + + + for(int i = 0; i < num_sources; i++){ + source_list[i].join(); + REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT); + } + + // stop the sinks + is_running = false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + + REQUIRE(subscription_manager.num_complete() == num_sources); + REQUIRE(subscription_manager.num_pending() == 0); + + } + + SECTION("Spurious messages"){ + std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl; + + // In this section, we basically inject + // spurious messages to subscription handler. + // There are no outcomes. basically + // handler should be able to ignore these messages + + int num_packets = 50; + int num_sinks = 10; + std::vector sink_list; + + subscription_manager.clear(); + std::cout <<"Message queue size prior to fill = " << message_bus.size() << std::endl; + random_tx(num_packets); + std::cout <<"Message queue size post fill = " << message_bus.size() << std::endl; + + + // start up the sinks + is_running = true; + for(int i = 0; i < num_sinks; i++){ + sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); + } + + // wait for queue to drain out + while(! message_bus.empty()){ + sleep(2); + } + + // stop the sinks + is_running =false; + for(int i = 0; i < num_sinks; i++){ + sink_list[i].join(); + } + REQUIRE(subscription_manager.num_complete() == 0); + REQUIRE(subscription_manager.num_pending() == 0); + + std::cout <<"Message queue size at end = " << message_bus.size() << std::endl; + + } + +}; + + +