/*================================================================================== Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ================================================================================== */ /* Author : Ashwin Sridharan Date : Feb 2019 */ /* Unit testing of subscription handler */ #define CATCH_CONFIG_MAIN #include #include #include #include #include #include #include #include #include #include #include #include // global queue for testing std::queue message_bus; // global lock for testing std::mutex get_object ; bool is_running = true; bool mock_fail(int mtype, size_t len, void * payload, int mode){ return false; } bool mock_silent(int mtype, size_t len, void * payload, int mode){ return true; } bool mock_tx(int mytpe, size_t len, void *payload, int mode){ bool res; int i; subscription_helper he; subscription_response_helper he_resp; subscription_request sub_req; subscription_response sub_resp; subscription_delete sub_del_req; subscription_delete_response sub_del_resp; asn_dec_rval_t retval; E2N_E2AP_PDU_t * e2ap_pdu_recv; unsigned char buffer[256]; size_t buf_size = 256; bool msg_ok = false; e2ap_pdu_recv = 0; retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_pdu_recv), payload, len); if(retval.code != RC_OK){ std::cerr <<"Error decoding E2N_E2AP Subscription response PDU. Reason = " << strerror(errno) << std::endl; ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv); return false; } int procedure_code = e2ap_pdu_recv->choice.initiatingMessage->procedureCode; if(procedure_code == E2N_ProcedureCode_id_ricSubscription){ he.clear(); sub_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he); // set up response object he_resp.set_request(he.get_request_id(), he.get_req_seq()); he_resp.set_function_id(he.get_function_id()); i = 0; // we simply copy over actions to both admitted and not // admitted list for now .. // in future, may need to be more selective for(auto &e : *(he.get_list())){ he_resp.add_action(e.get_id()); he_resp.add_action(e.get_id(), 1, 2); i++; } if(mode == 0){ res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true); if (!res){ std::cerr << "Error encoding subscription response successful. Reason = " << sub_resp.get_error() << std::endl; } else{ msg_ok = true; } } else{ res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false); if (!res){ std::cerr << "Error encoding subscription response failure . Reason = " << sub_resp.get_error() << std::endl; } else{ msg_ok = true; } }; } else if (procedure_code == E2N_ProcedureCode_id_ricSubscriptionDelete){ he.clear(); sub_del_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he); // set up response object he_resp.clear(); he_resp.set_request(he.get_request_id(), he.get_req_seq()); he_resp.set_function_id(he.get_function_id()); if(mode == 0){ res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true); if (!res){ std::cerr << "Error encoding subscription delete response failure . Reason = " << sub_resp.get_error() << std::endl; } else{ msg_ok = true; } } else{ res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false); if (!res){ std::cerr << "Error encoding subscription delete response failure . Reason = " << sub_resp.get_error() << std::endl; } else{ msg_ok = true; std::cout <<"Sending delete failures ..." << std::endl; } } } else{ std::cout <<"Illegal request" << std::endl; } // push to queue if(msg_ok){ std::lock_guard guard(get_object); std::string msg((char *)buffer, buf_size); //std::cout <<"Pushed to queue" << std::endl; message_bus.push(msg); } ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv); if(msg_ok) return true; else return false; } // Randomly generate number of subscription response and delete // response packets and push to queue void random_tx(int num_packets){ subscription_response_helper he_resp; subscription_response sub_resp; subscription_delete sub_del_req; subscription_delete_response sub_del_resp; bool res; unsigned char buffer[256]; size_t buf_size = 256; he_resp.add_action(10); // generate subscription responses for(int i = 0; i < num_packets; i++){ // set up response object he_resp.set_request(i, 1); he_resp.set_function_id(0); buf_size = 256; res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true); { std::lock_guard guard(get_object); std::string msg((char *)buffer, buf_size); message_bus.push(msg); } buf_size = 256; res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false); { std::lock_guard guard(get_object); std::string msg((char *)buffer, buf_size); message_bus.push(msg); } buf_size = 256; res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true); { std::lock_guard guard(get_object); std::string msg((char *)buffer, buf_size); message_bus.push(msg); } buf_size = 256; res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false); { std::lock_guard guard(get_object); std::string msg((char *)buffer, buf_size); message_bus.push(msg); } } } void mock_RAN (subscription_handler &_ref_sub_handler){ // Behaviour : unsigned char incorrect_e2ap[128]; size_t incorrect_e2ap_size = 128; for(int i = 0; i < 128; i++){ incorrect_e2ap[i] = 'b'; } FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r"); if(pfile == NULL){ std::cout << "Error opening e2ap_indication_test.per" << std::endl; exit(-1); } unsigned char e2ap_msg[512]; size_t e2ap_msg_size = fread((char *)e2ap_msg, sizeof(char), 512, pfile); fclose(pfile); unsigned char message_buf[512]; std::string pdu; while(is_running){ // send some random data, i.e incorrect E2AP _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size); //std::cout <<"Sent random data to subscription handler" << std::endl; // send an E2AP which is not subscription request _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size); //std::cout <<"Sent incorrect e2ap to subscription handler" << std::endl; // now look in the queue, pop it and send the data // finally send correct payload { std::lock_guard guard(get_object); if(! message_bus.empty()){ pdu = message_bus.front(); memcpy(message_buf, pdu.c_str(), pdu.length()); message_bus.pop(); } } _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length()); //std::cout <<"Sent response to subscription handler" << std::endl; sleep(1); } } void send_request(subscription_handler &subscription_manager, std::vector & status_vector, int index, bool (*tx)(int, size_t, void *, int), int mode ){ subscription_helper subscription_info; subscription_request sub_req; subscription_response_helper subscription_response_info; int function_id = 0; int action_id = 4; int action_type = 0; int request_id = 1; int req_seq = 1; int message_type = 1; int procedure_code = 27; std::string egnb_id = "Testgnb"; std::string plmn_id = "Testplmn"; unsigned char event_buf[128]; size_t event_buf_len = 128; int res; e2sm_event_trigger_helper trigger_data; e2sm_event_trigger event_trigger; trigger_data.egNB_id = egnb_id; trigger_data.plmn_id = plmn_id; trigger_data.egNB_id_type = 2; trigger_data.interface_direction = 1; trigger_data.procedure_code = procedure_code; trigger_data.message_type = message_type; res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data); subscription_info.clear(); subscription_info.set_request(request_id, req_seq); subscription_info.set_function_id(function_id); subscription_info.add_action(action_id, action_type); subscription_info.set_event_def(&event_buf[0], event_buf_len); auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode); res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter); if (res == SUBSCR_SUCCESS ){ // store -ve of request id status_vector[index] = -1 * subscription_info.get_request_id(); } else{ status_vector[index] = res; } std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl; } void delete_request(subscription_handler &subscription_manager, std::vector & status_vector, int index, int request_id, bool ( *tx)(int, size_t, void *, int), int mode ){ subscription_helper subscription_info; subscription_response_helper subscription_response_info; //verify subscription deleted subscription_info.set_request(request_id, 1); subscription_info.set_function_id(0); auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode); status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter); }; TEST_CASE("Test subscription work flow", "E2AP Subscription"){ subscription_handler subscription_manager; mdclog_attr_t *attr; mdclog_attr_init(&attr); mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW "); mdclog_init(attr); mdclog_level_set(MDCLOG_DEBUG); mdclog_attr_destroy(attr); unsigned char node_buffer[32]; std::string gNodeB = "TEST_GNOBDE"; std::copy(gNodeB.begin(), gNodeB.end(), node_buffer); node_buffer[gNodeB.length()] = '\0'; //==================================== SECTION("Verify behaviour if no listener "){ std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl; int num_sources = 10; std::vector status_vector(num_sources, 0); subscription_manager.clear(); std::vector source_list; for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] == SUBSCR_ERR_TX); } REQUIRE(subscription_manager.num_complete() == 0); REQUIRE(subscription_manager.num_pending() == 0); } SECTION("Verify behaviour if listener does not respond"){ std::cout <<"+++++++++" << std::endl << "TEST WHEN LISTENER DOES NOT RESPOND " << std::endl; int num_sources = 10; std::vector status_vector(num_sources, 0); subscription_manager.clear(); std::vector source_list; for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); } for(int i = 0; i < num_sources; i++){ REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT); } REQUIRE(subscription_manager.num_complete() == 0); REQUIRE(subscription_manager.num_pending() == 0); } SECTION("Verify timeout behaviour if listener does not response"){ std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR " << std::endl; int res; int num_sources = 1; int timeout_val = 2; int num_tries = 1; std::vector status_vector(num_sources, 0); subscription_manager.clear(); subscription_manager.set_timeout(timeout_val); subscription_manager.set_num_retries(num_tries); auto start = std::chrono::steady_clock::now(); send_request(subscription_manager, status_vector, 0, mock_silent, 0); auto end = std::chrono::steady_clock::now(); auto diff = end - start; REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT); REQUIRE(diff.count() >= num_tries * timeout_val); num_tries = 2; subscription_manager.set_num_retries(num_tries); status_vector.clear(); start = std::chrono::steady_clock::now(); send_request(subscription_manager, status_vector, 0, mock_silent, 0); end = std::chrono::steady_clock::now(); diff = end - start; REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT); REQUIRE(diff.count() >= num_tries * timeout_val); } SECTION("Verify rejection of illegal pdus"){ std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl; subscription_helper subscription_info; subscription_response_helper subscription_response_info; subscription_manager.clear(); int res; int function_id = 60000; int action_id = 4; int action_type = 0; int request_id = 1; int req_seq = 1; int message_type = 1; int procedure_code = 27; unsigned char event_buf[] = "Hello world"; size_t event_buf_len = strlen((const char *)event_buf); subscription_info.clear(); subscription_info.set_request(request_id, req_seq); subscription_info.set_function_id(function_id); subscription_info.add_action(action_id, action_type); subscription_info.set_event_def(&event_buf[0], event_buf_len); auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0); res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter); REQUIRE(res == SUBSCR_ERR_ENCODE); } SECTION("Verify subscription request/response fail"){ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION FAILURE " << std::endl; subscription_manager.clear(); int num_sources = 20; int num_sinks = 5; std::vector source_list; std::vector sink_list; std::vector status_vector(num_sources, 0); // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); } for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL); } // stop the sinks is_running =false; for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } REQUIRE(subscription_manager.num_complete() == 0); REQUIRE(subscription_manager.num_pending() == 0); } SECTION("Verify subscription request/response success"){ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl; subscription_manager.clear(); int num_sources = 10; int num_sinks = 5; std::vector source_list; std::vector sink_list; std::vector status_vector(num_sources, 0); // Test null cases in queries REQUIRE(subscription_manager.is_subscription_entry(10) == false); REQUIRE(subscription_manager.is_request_entry(1) == false); REQUIRE(subscription_manager.get_request_status(1) == -1); // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); } for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] < 0); REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); } // stop the sinks is_running =false; for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); const subscription_response_helper * sub_info = subscription_manager.get_subscription(-1); REQUIRE(sub_info == NULL); sub_info = subscription_manager.get_subscription(-1 * status_vector[0]); REQUIRE(sub_info != NULL); REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]); } SECTION("Delete requests for non-existent subscription requests"){ std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl; subscription_manager.clear(); REQUIRE(subscription_manager.get_request_status(0) == -1); REQUIRE(subscription_manager.is_subscription_entry(0) == false); REQUIRE(subscription_manager.is_request_entry(0) == false); int num_sources = 10; std::vector source_list; std::vector status_vector(num_sources, 0); srand(100); for(int i = 0; i < num_sources; i++){ int req_id = rand()%1000; source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id , std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING); } } SECTION("Delete requests that have succeeeded"){ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl; subscription_manager.clear(); int num_sources = 10; int num_sinks = 5; const subscription_response_helper * sub_resp_info; std::vector source_list; std::vector sink_list; std::vector status_vector(num_sources, 0); // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); } // First do subscriptions ... for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] < 0 ); REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); REQUIRE(sub_resp_info != NULL); REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); } REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); // Store ids .. std::vector completed_requests = status_vector; // Delete successes source_list.clear(); for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] == SUBSCR_SUCCESS); } REQUIRE(subscription_manager.num_pending() == 0); // stop the sinks is_running =false; for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } REQUIRE(subscription_manager.num_complete() == 0); REQUIRE(subscription_manager.num_pending() == 0); } SECTION("Deletes that fail"){ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl; subscription_manager.clear(); int num_sources = 10; int num_sinks = 5; const subscription_response_helper * sub_resp_info; std::vector source_list; std::vector sink_list; std::vector status_vector(num_sources, 0); // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); } // First do subscriptions ... for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] < 0 ); REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); REQUIRE(sub_resp_info != NULL); REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); } REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); // Store ids .. std::vector completed_requests = status_vector; // Delete failures source_list.clear(); for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 1)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL); } // stop the sinks is_running = false; for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } // subscriptions are still there REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); } SECTION("Deletes that timed out "){ std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT TIMEOUT " << std::endl; subscription_manager.clear(); int num_sources = 10; int num_sinks = 5; const subscription_response_helper * sub_resp_info; std::vector source_list; std::vector sink_list; std::vector status_vector(num_sources, 0); // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); } // First do subscriptions ... for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] < 0 ); REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true); sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]); REQUIRE(sub_resp_info != NULL); REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]); } REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); // Store ids .. std::vector completed_requests = status_vector; // Delete with time-outs source_list.clear(); for(int i = 0; i < num_sources; i++){ source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_silent), 0)); } for(int i = 0; i < num_sources; i++){ source_list[i].join(); REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT); } // stop the sinks is_running = false; for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } REQUIRE(subscription_manager.num_complete() == num_sources); REQUIRE(subscription_manager.num_pending() == 0); } SECTION("Spurious messages"){ std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl; // In this section, we basically inject // spurious messages to subscription handler. // There are no outcomes. basically // handler should be able to ignore these messages int num_packets = 50; int num_sinks = 10; std::vector sink_list; subscription_manager.clear(); std::cout <<"Message queue size prior to fill = " << message_bus.size() << std::endl; random_tx(num_packets); std::cout <<"Message queue size post fill = " << message_bus.size() << std::endl; // start up the sinks is_running = true; for(int i = 0; i < num_sinks; i++){ sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager))); } // wait for queue to drain out while(! message_bus.empty()){ sleep(2); } // stop the sinks is_running =false; for(int i = 0; i < num_sinks; i++){ sink_list[i].join(); } REQUIRE(subscription_manager.num_complete() == 0); REQUIRE(subscription_manager.num_pending() == 0); std::cout <<"Message queue size at end = " << message_bus.size() << std::endl; } };