1 /*==================================================================================
3 Copyright (c) 2018-2019 AT&T Intellectual Property.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 ==================================================================================
19 /* Author : Ashwin Sridharan
25 Unit testing of subscription handler
28 #define CATCH_CONFIG_MAIN
29 #include <catch2/catch.hpp>
37 #include <subscription_handler.hpp>
41 #include <rmr/RIC_message_types.h>
45 // global queue for testing
46 std::queue<std::string> message_bus;
48 // global lock for testing
49 std::mutex get_object ;
51 bool is_running = true;
53 bool mock_fail(int mtype, size_t len, void * payload, int mode){
57 bool mock_silent(int mtype, size_t len, void * payload, int mode){
62 bool mock_tx(int mytpe, size_t len, void *payload, int mode){
66 subscription_helper he;
67 subscription_response_helper he_resp;
69 subscription_request sub_req;
70 subscription_response sub_resp;
72 subscription_delete sub_del_req;
73 subscription_delete_response sub_del_resp;
74 asn_dec_rval_t retval;
76 E2N_E2AP_PDU_t * e2ap_pdu_recv;
77 unsigned char buffer[256];
78 size_t buf_size = 256;
82 retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_pdu_recv), payload, len);
83 if(retval.code != RC_OK){
84 std::cerr <<"Error decoding E2N_E2AP Subscription response PDU. Reason = " << strerror(errno) << std::endl;
85 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
89 int procedure_code = e2ap_pdu_recv->choice.initiatingMessage->procedureCode;
91 if(procedure_code == E2N_ProcedureCode_id_ricSubscription){
94 sub_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he);
97 // set up response object
98 he_resp.set_request(he.get_request_id(), he.get_req_seq());
99 he_resp.set_function_id(he.get_function_id());
102 // we simply copy over actions to both admitted and not
103 // admitted list for now ..
104 // in future, may need to be more selective
105 for(auto &e : *(he.get_list())){
106 he_resp.add_action(e.get_id());
107 he_resp.add_action(e.get_id(), 1, 2);
112 res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true);
114 std::cerr << "Error encoding subscription response successful. Reason = " << sub_resp.get_error() << std::endl;
121 res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false);
123 std::cerr << "Error encoding subscription response failure . Reason = " << sub_resp.get_error() << std::endl;
133 else if (procedure_code == E2N_ProcedureCode_id_ricSubscriptionDelete){
136 sub_del_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he);
138 // set up response object
140 he_resp.set_request(he.get_request_id(), he.get_req_seq());
141 he_resp.set_function_id(he.get_function_id());
143 res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true);
145 std::cerr << "Error encoding subscription delete response failure . Reason = " << sub_resp.get_error() << std::endl;
152 res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false);
154 std::cerr << "Error encoding subscription delete response failure . Reason = " << sub_resp.get_error() << std::endl;
158 std::cout <<"Sending delete failures ..." << std::endl;
164 std::cout <<"Illegal request" << std::endl;
170 std::lock_guard<std::mutex> guard(get_object);
171 std::string msg((char *)buffer, buf_size);
172 //std::cout <<"Pushed to queue" << std::endl;
173 message_bus.push(msg);
177 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
189 // Randomly generate number of subscription response and delete
190 // response packets and push to queue
191 void random_tx(int num_packets){
192 subscription_response_helper he_resp;
193 subscription_response sub_resp;
194 subscription_delete sub_del_req;
195 subscription_delete_response sub_del_resp;
197 unsigned char buffer[256];
198 size_t buf_size = 256;
200 he_resp.add_action(10);
202 // generate subscription responses
203 for(int i = 0; i < num_packets; i++){
205 // set up response object
206 he_resp.set_request(i, 1);
207 he_resp.set_function_id(0);
209 res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, true);
211 std::lock_guard<std::mutex> guard(get_object);
212 std::string msg((char *)buffer, buf_size);
213 message_bus.push(msg);
217 res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false);
219 std::lock_guard<std::mutex> guard(get_object);
220 std::string msg((char *)buffer, buf_size);
221 message_bus.push(msg);
225 res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, true);
227 std::lock_guard<std::mutex> guard(get_object);
228 std::string msg((char *)buffer, buf_size);
229 message_bus.push(msg);
233 res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size, he_resp, false);
235 std::lock_guard<std::mutex> guard(get_object);
236 std::string msg((char *)buffer, buf_size);
237 message_bus.push(msg);
245 void mock_RAN (subscription_handler &_ref_sub_handler){
248 unsigned char incorrect_e2ap[128];
249 size_t incorrect_e2ap_size = 128;
250 for(int i = 0; i < 128; i++){
251 incorrect_e2ap[i] = 'b';
254 FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r");
256 std::cout << "Error opening e2ap_indication_test.per" << std::endl;
259 unsigned char e2ap_msg[512];
260 size_t e2ap_msg_size = fread((char *)e2ap_msg, sizeof(char), 512, pfile);
263 unsigned char message_buf[512];
267 // send some random data, i.e incorrect E2AP
268 _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size);
269 //std::cout <<"Sent random data to subscription handler" << std::endl;
271 // send an E2AP which is not subscription request
272 _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size);
273 //std::cout <<"Sent incorrect e2ap to subscription handler" << std::endl;
275 // now look in the queue, pop it and send the data
276 // finally send correct payload
278 std::lock_guard<std::mutex> guard(get_object);
279 if(! message_bus.empty()){
280 pdu = message_bus.front();
281 memcpy(message_buf, pdu.c_str(), pdu.length());
286 _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length());
287 //std::cout <<"Sent response to subscription handler" << std::endl;
297 void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, int index, bool (*tx)(int, size_t, void *, int), int mode ){
298 subscription_helper subscription_info;
299 subscription_request sub_req;
300 subscription_response_helper subscription_response_info;
308 int message_type = 1;
309 int procedure_code = 27;
310 std::string egnb_id = "Testgnb";
311 std::string plmn_id = "Testplmn";
313 unsigned char event_buf[128];
314 size_t event_buf_len = 128;
318 e2sm_event_trigger_helper trigger_data;
319 e2sm_event_trigger event_trigger;
321 trigger_data.egNB_id = egnb_id;
322 trigger_data.plmn_id = plmn_id;
323 trigger_data.egNB_id_type = 2;
324 trigger_data.interface_direction = 1;
325 trigger_data.procedure_code = procedure_code;
326 trigger_data.message_type = message_type;
328 res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
330 subscription_info.clear();
331 subscription_info.set_request(request_id, req_seq);
332 subscription_info.set_function_id(function_id);
333 subscription_info.add_action(action_id, action_type);
334 subscription_info.set_event_def(&event_buf[0], event_buf_len);
336 auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
337 res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
339 if (res == SUBSCR_SUCCESS ){
340 // store -ve of request id
341 status_vector[index] = -1 * subscription_info.get_request_id();
344 status_vector[index] = res;
347 std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl;
350 void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, int index, int request_id, bool ( *tx)(int, size_t, void *, int), int mode ){
352 subscription_helper subscription_info;
353 subscription_response_helper subscription_response_info;
356 //verify subscription deleted
357 subscription_info.set_request(request_id, 1);
358 subscription_info.set_function_id(0);
360 auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
361 status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter);
367 TEST_CASE("Test subscription work flow", "E2AP Subscription"){
369 subscription_handler subscription_manager;
372 mdclog_attr_init(&attr);
373 mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
375 mdclog_level_set(MDCLOG_DEBUG);
376 mdclog_attr_destroy(attr);
378 unsigned char node_buffer[32];
379 std::string gNodeB = "TEST_GNOBDE";
381 std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
382 node_buffer[gNodeB.length()] = '\0';
385 //====================================
387 SECTION("Verify behaviour if no listener "){
388 std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl;
390 int num_sources = 10;
391 std::vector<int> status_vector(num_sources, 0);
392 subscription_manager.clear();
394 std::vector<std::thread> source_list;
396 for(int i = 0; i < num_sources; i++){
397 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0));
400 for(int i = 0; i < num_sources; i++){
401 source_list[i].join();
402 REQUIRE(status_vector[i] == SUBSCR_ERR_TX);
405 REQUIRE(subscription_manager.num_complete() == 0);
406 REQUIRE(subscription_manager.num_pending() == 0);
409 SECTION("Verify behaviour if listener does not respond"){
410 std::cout <<"+++++++++" << std::endl << "TEST WHEN LISTENER DOES NOT RESPOND " << std::endl;
412 int num_sources = 10;
413 std::vector<int> status_vector(num_sources, 0);
415 subscription_manager.clear();
417 std::vector<std::thread> source_list;
419 for(int i = 0; i < num_sources; i++){
420 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0));
423 for(int i = 0; i < num_sources; i++){
424 source_list[i].join();
427 for(int i = 0; i < num_sources; i++){
428 REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
431 REQUIRE(subscription_manager.num_complete() == 0);
432 REQUIRE(subscription_manager.num_pending() == 0);
436 SECTION("Verify timeout behaviour if listener does not response"){
437 std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR " << std::endl;
443 std::vector<int> status_vector(num_sources, 0);
445 subscription_manager.clear();
446 subscription_manager.set_timeout(timeout_val);
447 subscription_manager.set_num_retries(num_tries);
449 auto start = std::chrono::steady_clock::now();
450 send_request(subscription_manager, status_vector, 0, mock_silent, 0);
451 auto end = std::chrono::steady_clock::now();
453 auto diff = end - start;
455 REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT);
456 REQUIRE(diff.count() >= num_tries * timeout_val);
459 subscription_manager.set_num_retries(num_tries);
460 status_vector.clear();
462 start = std::chrono::steady_clock::now();
463 send_request(subscription_manager, status_vector, 0, mock_silent, 0);
464 end = std::chrono::steady_clock::now();
468 REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT);
469 REQUIRE(diff.count() >= num_tries * timeout_val);
473 SECTION("Verify rejection of illegal pdus"){
474 std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl;
475 subscription_helper subscription_info;
476 subscription_response_helper subscription_response_info;
478 subscription_manager.clear();
480 int function_id = 60000;
486 int message_type = 1;
487 int procedure_code = 27;
489 unsigned char event_buf[] = "Hello world";
490 size_t event_buf_len = strlen((const char *)event_buf);
492 subscription_info.clear();
493 subscription_info.set_request(request_id, req_seq);
494 subscription_info.set_function_id(function_id);
495 subscription_info.add_action(action_id, action_type);
496 subscription_info.set_event_def(&event_buf[0], event_buf_len);
499 auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0);
500 res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
501 REQUIRE(res == SUBSCR_ERR_ENCODE);
507 SECTION("Verify subscription request/response fail"){
508 std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION FAILURE " << std::endl;
510 subscription_manager.clear();
512 int num_sources = 20;
515 std::vector<std::thread> source_list;
516 std::vector<std::thread> sink_list;
518 std::vector<int> status_vector(num_sources, 0);
520 // start up the sinks
522 for(int i = 0; i < num_sinks; i++){
523 sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
526 for(int i = 0; i < num_sources; i++){
527 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1));
531 for(int i = 0; i < num_sources; i++){
532 source_list[i].join();
533 REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL);
538 for(int i = 0; i < num_sinks; i++){
542 REQUIRE(subscription_manager.num_complete() == 0);
543 REQUIRE(subscription_manager.num_pending() == 0);
547 SECTION("Verify subscription request/response success"){
548 std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;
551 subscription_manager.clear();
553 int num_sources = 10;
556 std::vector<std::thread> source_list;
557 std::vector<std::thread> sink_list;
559 std::vector<int> status_vector(num_sources, 0);
561 // Test null cases in queries
562 REQUIRE(subscription_manager.is_subscription_entry(10) == false);
563 REQUIRE(subscription_manager.is_request_entry(1) == false);
564 REQUIRE(subscription_manager.get_request_status(1) == -1);
566 // start up the sinks
568 for(int i = 0; i < num_sinks; i++){
569 sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
572 for(int i = 0; i < num_sources; i++){
573 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
577 for(int i = 0; i < num_sources; i++){
578 source_list[i].join();
579 REQUIRE(status_vector[i] < 0);
580 REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
585 for(int i = 0; i < num_sinks; i++){
589 REQUIRE(subscription_manager.num_complete() == num_sources);
590 REQUIRE(subscription_manager.num_pending() == 0);
592 const subscription_response_helper * sub_info = subscription_manager.get_subscription(-1);
593 REQUIRE(sub_info == NULL);
595 sub_info = subscription_manager.get_subscription(-1 * status_vector[0]);
596 REQUIRE(sub_info != NULL);
597 REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]);
601 SECTION("Delete requests for non-existent subscription requests"){
602 std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
604 subscription_manager.clear();
605 REQUIRE(subscription_manager.get_request_status(0) == -1);
606 REQUIRE(subscription_manager.is_subscription_entry(0) == false);
607 REQUIRE(subscription_manager.is_request_entry(0) == false);
609 int num_sources = 10;
611 std::vector<std::thread> source_list;
612 std::vector<int> status_vector(num_sources, 0);
614 for(int i = 0; i < num_sources; i++){
615 int req_id = rand()%1000;
616 source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id , std::ref(mock_tx), 0));
620 for(int i = 0; i < num_sources; i++){
621 source_list[i].join();
622 REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING);
630 SECTION("Delete requests that have succeeeded"){
631 std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
633 subscription_manager.clear();
635 int num_sources = 10;
637 const subscription_response_helper * sub_resp_info;
639 std::vector<std::thread> source_list;
640 std::vector<std::thread> sink_list;
642 std::vector<int> status_vector(num_sources, 0);
644 // start up the sinks
646 for(int i = 0; i < num_sinks; i++){
647 sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
650 // First do subscriptions ...
651 for(int i = 0; i < num_sources; i++){
652 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
656 for(int i = 0; i < num_sources; i++){
657 source_list[i].join();
658 REQUIRE(status_vector[i] < 0 );
659 REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
660 sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
661 REQUIRE(sub_resp_info != NULL);
662 REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
666 REQUIRE(subscription_manager.num_complete() == num_sources);
667 REQUIRE(subscription_manager.num_pending() == 0);
671 std::vector<int> completed_requests = status_vector;
675 for(int i = 0; i < num_sources; i++){
676 source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 0));
680 for(int i = 0; i < num_sources; i++){
681 source_list[i].join();
682 REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
685 REQUIRE(subscription_manager.num_pending() == 0);
690 for(int i = 0; i < num_sinks; i++){
693 REQUIRE(subscription_manager.num_complete() == 0);
694 REQUIRE(subscription_manager.num_pending() == 0);
698 SECTION("Deletes that fail"){
700 std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl;
702 subscription_manager.clear();
704 int num_sources = 10;
706 const subscription_response_helper * sub_resp_info;
708 std::vector<std::thread> source_list;
709 std::vector<std::thread> sink_list;
710 std::vector<int> status_vector(num_sources, 0);
712 // start up the sinks
714 for(int i = 0; i < num_sinks; i++){
715 sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
718 // First do subscriptions ...
719 for(int i = 0; i < num_sources; i++){
720 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
724 for(int i = 0; i < num_sources; i++){
725 source_list[i].join();
726 REQUIRE(status_vector[i] < 0 );
727 REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
728 sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
729 REQUIRE(sub_resp_info != NULL);
730 REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
734 REQUIRE(subscription_manager.num_complete() == num_sources);
735 REQUIRE(subscription_manager.num_pending() == 0);
739 std::vector<int> completed_requests = status_vector;
743 for(int i = 0; i < num_sources; i++){
744 source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_tx), 1));
747 for(int i = 0; i < num_sources; i++){
748 source_list[i].join();
749 REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL);
755 for(int i = 0; i < num_sinks; i++){
759 // subscriptions are still there
760 REQUIRE(subscription_manager.num_complete() == num_sources);
761 REQUIRE(subscription_manager.num_pending() == 0);
766 SECTION("Deletes that timed out "){
768 std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT TIMEOUT " << std::endl;
770 subscription_manager.clear();
772 int num_sources = 10;
774 const subscription_response_helper * sub_resp_info;
776 std::vector<std::thread> source_list;
777 std::vector<std::thread> sink_list;
779 std::vector<int> status_vector(num_sources, 0);
781 // start up the sinks
783 for(int i = 0; i < num_sinks; i++){
784 sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
787 // First do subscriptions ...
788 for(int i = 0; i < num_sources; i++){
789 source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
793 for(int i = 0; i < num_sources; i++){
794 source_list[i].join();
795 REQUIRE(status_vector[i] < 0 );
796 REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
797 sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
798 REQUIRE(sub_resp_info != NULL);
799 REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
803 REQUIRE(subscription_manager.num_complete() == num_sources);
804 REQUIRE(subscription_manager.num_pending() == 0);
808 std::vector<int> completed_requests = status_vector;
810 // Delete with time-outs
812 for(int i = 0; i < num_sources; i++){
813 source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i], std::ref(mock_silent), 0));
817 for(int i = 0; i < num_sources; i++){
818 source_list[i].join();
819 REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
824 for(int i = 0; i < num_sinks; i++){
828 REQUIRE(subscription_manager.num_complete() == num_sources);
829 REQUIRE(subscription_manager.num_pending() == 0);
833 SECTION("Spurious messages"){
834 std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl;
836 // In this section, we basically inject
837 // spurious messages to subscription handler.
838 // There are no outcomes. basically
839 // handler should be able to ignore these messages
841 int num_packets = 50;
843 std::vector<std::thread> sink_list;
845 subscription_manager.clear();
846 std::cout <<"Message queue size prior to fill = " << message_bus.size() << std::endl;
847 random_tx(num_packets);
848 std::cout <<"Message queue size post fill = " << message_bus.size() << std::endl;
851 // start up the sinks
853 for(int i = 0; i < num_sinks; i++){
854 sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
857 // wait for queue to drain out
858 while(! message_bus.empty()){
864 for(int i = 0; i < num_sinks; i++){
867 REQUIRE(subscription_manager.num_complete() == 0);
868 REQUIRE(subscription_manager.num_pending() == 0);
870 std::cout <<"Message queue size at end = " << message_bus.size() << std::endl;