1. Transitioned to using latest asn1c compiler
[ric-app/admin.git] / test / unit_test_subscription_flow.cc
1 /*==================================================================================
2
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date   : Feb 2019
21 */
22
23
24 /* 
25    Unit testing of subscription handler
26 */
27
28 #define CATCH_CONFIG_MAIN
29 #include <catch2/catch.hpp>
30
31
32 #include <string.h>
33 #include <stdio.h>
34 #include <iostream>
35 #include <csignal>
36 #include <chrono>
37 #include <subscription_handler.hpp>
38 #include <e2sm.hpp>
39 #include <queue>
40 #include <mutex>
41 #include <rmr/RIC_message_types.h>
42 #include <thread>
43
44
45 // global queue for testing
46 std::queue<std::string> message_bus;
47
48 // global lock for testing
49 std::mutex get_object ;
50
51 bool is_running = true;
52
53 bool mock_fail(int mtype,  size_t len,  void * payload, int mode){
54   return false;
55 }
56
57 bool mock_silent(int mtype,  size_t len, void * payload, int mode){
58   return true;
59 }
60
61
62 bool mock_tx(int mytpe,  size_t len,  void *payload, int mode){
63
64   bool res;
65   int i;
66   subscription_helper he;
67   subscription_response_helper he_resp;
68   
69   subscription_request sub_req;
70   subscription_response sub_resp;
71
72   subscription_delete sub_del_req;
73   subscription_delete_response sub_del_resp;
74   asn_dec_rval_t retval;
75
76   E2N_E2AP_PDU_t * e2ap_pdu_recv;
77   unsigned char buffer[256];
78   size_t buf_size = 256;
79   bool msg_ok = false;
80   
81   e2ap_pdu_recv = 0;
82   retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_pdu_recv), payload, len);
83   if(retval.code != RC_OK){
84     std::cerr <<"Error decoding E2N_E2AP Subscription response PDU. Reason = " << strerror(errno) << std::endl;
85     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
86     return false;
87   }
88   
89   int procedure_code = e2ap_pdu_recv->choice.initiatingMessage->procedureCode;
90   
91   if(procedure_code == E2N_ProcedureCode_id_ricSubscription){
92
93     he.clear();
94     sub_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he);
95     
96  
97     // set up response object
98     he_resp.set_request(he.get_request_id(), he.get_req_seq());
99     he_resp.set_function_id(he.get_function_id());
100     i = 0;
101
102     // we simply copy over actions to both admitted and not
103     // admitted list for now ..
104     // in future, may need to be more selective
105     for(auto &e : *(he.get_list())){
106         he_resp.add_action(e.get_id());
107         he_resp.add_action(e.get_id(), 1, 2);
108       i++;
109     }
110     
111     if(mode == 0){
112       res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size,  he_resp, true);
113       if (!res){
114         std::cerr << "Error encoding subscription response successful. Reason = " << sub_resp.get_error() << std::endl;
115       }
116       else{
117         msg_ok = true;
118       }
119     }
120     else{
121       res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false);
122       if (!res){
123         std::cerr << "Error encoding subscription response failure . Reason = " << sub_resp.get_error() << std::endl;
124       }
125       else{
126         msg_ok = true;
127       }
128
129     };
130   
131   }
132
133   else if (procedure_code == E2N_ProcedureCode_id_ricSubscriptionDelete){
134
135     he.clear();
136     sub_del_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he);
137     
138     // set up response object
139     he_resp.clear();
140     he_resp.set_request(he.get_request_id(), he.get_req_seq());
141     he_resp.set_function_id(he.get_function_id());
142     if(mode == 0){
143       res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, true);
144       if (!res){
145         std::cerr << "Error encoding subscription delete  response failure . Reason = " << sub_resp.get_error() << std::endl;
146       }
147       else{
148         msg_ok = true;
149       }
150     }
151     else{
152       res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, false);
153       if (!res){
154         std::cerr << "Error encoding subscription delete  response failure . Reason = " << sub_resp.get_error() << std::endl;
155       }
156       else{
157         msg_ok = true;
158         std::cout <<"Sending delete failures ..." << std::endl;
159       }
160       
161     }
162   }
163   else{
164     std::cout <<"Illegal request" << std::endl;
165   }
166
167   
168   // push to queue
169   if(msg_ok){
170     std::lock_guard<std::mutex> guard(get_object);
171     std::string msg((char *)buffer, buf_size);
172     //std::cout <<"Pushed to queue" << std::endl;
173     message_bus.push(msg);
174   }
175
176   
177   ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
178   if(msg_ok)
179     return true;
180   
181   else
182     return false;
183     
184     
185 }
186
187
188
189 // Randomly generate number of subscription response and delete
190 // response packets and push to queue
191 void random_tx(int num_packets){
192   subscription_response_helper he_resp;
193   subscription_response sub_resp;
194   subscription_delete sub_del_req;
195   subscription_delete_response sub_del_resp;
196   bool res;
197   unsigned char buffer[256];
198   size_t buf_size = 256;
199
200   he_resp.add_action(10);
201   
202   // generate subscription responses
203   for(int i = 0; i < num_packets; i++){
204     
205     // set up response object
206     he_resp.set_request(i, 1);
207     he_resp.set_function_id(0); 
208     buf_size = 256;
209     res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size,  he_resp, true);
210     {
211         std::lock_guard<std::mutex> guard(get_object);
212         std::string msg((char *)buffer, buf_size);
213         message_bus.push(msg);
214     }
215     
216     buf_size = 256;
217     res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size,  he_resp, false);
218     {
219         std::lock_guard<std::mutex> guard(get_object);
220         std::string msg((char *)buffer, buf_size);
221         message_bus.push(msg);
222     }
223     
224     buf_size = 256;
225     res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, true);
226     {
227       std::lock_guard<std::mutex> guard(get_object);
228       std::string msg((char *)buffer, buf_size);
229       message_bus.push(msg);
230     }
231
232     buf_size = 256;
233     res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, false);
234     {
235       std::lock_guard<std::mutex> guard(get_object);
236       std::string msg((char *)buffer, buf_size);
237       message_bus.push(msg);
238     }
239     
240     
241   }
242 }
243
244
245 void  mock_RAN (subscription_handler &_ref_sub_handler){
246   // Behaviour :
247   
248   unsigned char incorrect_e2ap[128];
249   size_t incorrect_e2ap_size = 128;
250   for(int i = 0; i < 128; i++){
251     incorrect_e2ap[i] = 'b';
252   }
253
254   FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r");
255   if(pfile == NULL){
256     std::cout <<  "Error opening e2ap_indication_test.per" << std::endl;
257     exit(-1);
258   }
259   unsigned char e2ap_msg[512];
260   size_t e2ap_msg_size = fread((char *)e2ap_msg, sizeof(char), 512, pfile);
261   fclose(pfile);
262
263   unsigned char message_buf[512];
264   std::string pdu;
265   
266   while(is_running){
267     // send some random data, i.e incorrect E2AP
268     _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size);
269     //std::cout <<"Sent random data to subscription handler" << std::endl;
270     
271     //  send an E2AP which is not subscription request
272     _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size);
273     //std::cout <<"Sent incorrect e2ap  to subscription handler" << std::endl;
274     
275     // now look in the queue, pop it and send the data
276     // finally send correct payload
277     {
278       std::lock_guard<std::mutex> guard(get_object);
279       if(! message_bus.empty()){
280         pdu  = message_bus.front();
281         memcpy(message_buf,  pdu.c_str(), pdu.length());
282         message_bus.pop();
283       }
284     }
285     
286     _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length());
287     //std::cout <<"Sent  response to subscription handler" << std::endl;
288       
289     
290     
291     sleep(1);
292   }
293   
294 }
295
296
297 void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, int index, bool (*tx)(int,  size_t, void *, int), int mode ){
298   subscription_helper subscription_info;
299   subscription_request sub_req;
300   subscription_response_helper subscription_response_info;
301
302   int function_id = 0;
303   int action_id = 4;
304   int action_type = 0;
305
306   int request_id = 1;
307   int req_seq = 1;
308   int message_type = 1;
309   int procedure_code = 27;
310   std::string egnb_id = "Testgnb";
311   std::string plmn_id = "Testplmn";
312   
313   unsigned char event_buf[128];
314   size_t event_buf_len = 128;
315   int res;    
316
317
318   e2sm_event_trigger_helper trigger_data;
319   e2sm_event_trigger event_trigger;
320   
321   trigger_data.egNB_id = egnb_id;
322   trigger_data.plmn_id = plmn_id;
323   trigger_data.egNB_id_type = 2;
324   trigger_data.interface_direction = 1;
325   trigger_data.procedure_code = procedure_code;
326   trigger_data.message_type = message_type;
327
328   res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
329   
330   subscription_info.clear();
331   subscription_info.set_request(request_id, req_seq);
332   subscription_info.set_function_id(function_id);
333   subscription_info.add_action(action_id, action_type);
334   subscription_info.set_event_def(&event_buf[0], event_buf_len);
335
336   auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
337   res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
338   
339   if (res == SUBSCR_SUCCESS ){
340     // store -ve of request id 
341     status_vector[index] = -1 * subscription_info.get_request_id();
342   }
343   else{
344     status_vector[index] = res;
345   }
346
347   std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl;
348 }
349
350 void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector,  int index, int request_id, bool ( *tx)(int,  size_t, void *, int), int mode ){
351
352   subscription_helper subscription_info;
353   subscription_response_helper subscription_response_info;
354
355  
356   //verify subscription deleted
357   subscription_info.set_request(request_id, 1);
358   subscription_info.set_function_id(0);
359
360   auto transmitter = std::bind(tx,  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
361   status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter);
362   
363          
364 };
365
366
367 TEST_CASE("Test subscription work flow", "E2AP Subscription"){
368
369   subscription_handler subscription_manager;
370     
371   mdclog_attr_t *attr;
372   mdclog_attr_init(&attr);
373   mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
374   mdclog_init(attr);
375   mdclog_level_set(MDCLOG_DEBUG);
376   mdclog_attr_destroy(attr);
377
378   unsigned char node_buffer[32];
379   std::string gNodeB = "TEST_GNOBDE";
380
381   std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
382   node_buffer[gNodeB.length()] = '\0';
383   
384
385   //====================================
386   
387   SECTION("Verify behaviour if no listener "){
388     std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl;
389     
390     int num_sources = 10;
391     std::vector<int> status_vector(num_sources, 0);
392     subscription_manager.clear();
393     
394     std::vector<std::thread> source_list;
395     
396     for(int i = 0; i < num_sources; i++){
397       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0));
398     }
399     
400     for(int i = 0; i < num_sources; i++){
401       source_list[i].join();
402       REQUIRE(status_vector[i] == SUBSCR_ERR_TX);
403     }
404     
405     REQUIRE(subscription_manager.num_complete() == 0);
406     REQUIRE(subscription_manager.num_pending() == 0);
407   }
408
409   SECTION("Verify behaviour if listener does not respond"){
410     std::cout <<"+++++++++" << std::endl << "TEST WHEN LISTENER DOES NOT RESPOND " << std::endl;
411     
412     int num_sources = 10;
413     std::vector<int> status_vector(num_sources, 0);
414
415     subscription_manager.clear();
416
417     std::vector<std::thread> source_list;
418     
419     for(int i = 0; i < num_sources; i++){
420       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0));
421     }
422     
423     for(int i = 0; i < num_sources; i++){
424       source_list[i].join();
425     }
426
427     for(int i = 0; i < num_sources; i++){
428       REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
429     }
430
431     REQUIRE(subscription_manager.num_complete() == 0);
432     REQUIRE(subscription_manager.num_pending() == 0);
433     
434   }
435   
436   SECTION("Verify timeout behaviour if listener does not response"){
437     std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR  " << std::endl;
438     
439     int res;
440     int num_sources = 1;
441     int timeout_val = 2;
442     int num_tries = 1;
443     std::vector<int> status_vector(num_sources, 0);
444      
445     subscription_manager.clear();
446     subscription_manager.set_timeout(timeout_val);
447     subscription_manager.set_num_retries(num_tries);
448     
449     auto start = std::chrono::steady_clock::now();
450     send_request(subscription_manager, status_vector, 0, mock_silent, 0);
451     auto end = std::chrono::steady_clock::now();
452
453     auto diff = end - start;
454     
455     REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT);
456     REQUIRE(diff.count() >= num_tries * timeout_val);
457
458     num_tries = 2;
459     subscription_manager.set_num_retries(num_tries);
460     status_vector.clear();
461
462     start = std::chrono::steady_clock::now();
463     send_request(subscription_manager, status_vector, 0, mock_silent, 0);
464     end = std::chrono::steady_clock::now();
465
466     diff = end - start;
467     
468     REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT);
469     REQUIRE(diff.count() >= num_tries * timeout_val);
470
471   }
472
473   SECTION("Verify rejection of illegal pdus"){
474     std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl;
475     subscription_helper subscription_info;
476     subscription_response_helper subscription_response_info;
477
478     subscription_manager.clear();
479     int res;
480     int function_id = 60000;
481     int action_id = 4;
482     int action_type = 0;
483     
484     int request_id = 1;
485     int req_seq = 1;
486     int message_type = 1;
487     int procedure_code = 27;
488
489     unsigned char event_buf[] = "Hello world";
490     size_t event_buf_len = strlen((const char *)event_buf);
491     
492     subscription_info.clear();
493     subscription_info.set_request(request_id, req_seq);
494     subscription_info.set_function_id(function_id);
495     subscription_info.add_action(action_id, action_type);
496     subscription_info.set_event_def(&event_buf[0], event_buf_len);
497
498
499     auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0);
500     res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
501     REQUIRE(res == SUBSCR_ERR_ENCODE);
502
503     
504
505   }
506   
507   SECTION("Verify subscription request/response fail"){
508     std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION FAILURE " << std::endl;
509     
510     subscription_manager.clear();
511     
512     int num_sources = 20;
513     int num_sinks = 5;
514     
515     std::vector<std::thread> source_list;
516     std::vector<std::thread> sink_list;
517
518     std::vector<int> status_vector(num_sources, 0);
519
520     // start up the sinks
521     is_running = true;
522     for(int i = 0; i < num_sinks; i++){
523       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
524     }
525     
526     for(int i = 0; i < num_sources; i++){
527       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1));
528     }
529
530     
531     for(int i = 0; i < num_sources; i++){
532       source_list[i].join();
533       REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL);
534     }
535
536     // stop the sinks
537     is_running =false;
538     for(int i = 0; i < num_sinks; i++){
539       sink_list[i].join();
540     }
541       
542     REQUIRE(subscription_manager.num_complete() == 0);
543     REQUIRE(subscription_manager.num_pending() == 0);
544
545   }
546
547    SECTION("Verify subscription request/response success"){
548      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;
549     
550
551      subscription_manager.clear();
552     
553      int num_sources = 10;
554      int num_sinks = 5;
555      
556      std::vector<std::thread> source_list;
557      std::vector<std::thread> sink_list;
558      
559      std::vector<int> status_vector(num_sources, 0);
560
561      // Test null cases in queries
562      REQUIRE(subscription_manager.is_subscription_entry(10) == false);
563      REQUIRE(subscription_manager.is_request_entry(1) == false);
564      REQUIRE(subscription_manager.get_request_status(1) == -1);
565      
566      // start up the sinks
567      is_running = true;
568      for(int i = 0; i < num_sinks; i++){
569        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
570      }
571      
572      for(int i = 0; i < num_sources; i++){
573        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
574      }
575      
576      
577      for(int i = 0; i < num_sources; i++){
578        source_list[i].join();
579        REQUIRE(status_vector[i] < 0);
580        REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
581      }
582      
583      // stop the sinks
584      is_running =false;
585      for(int i = 0; i < num_sinks; i++){
586        sink_list[i].join();
587      }
588      
589      REQUIRE(subscription_manager.num_complete() == num_sources);
590      REQUIRE(subscription_manager.num_pending() == 0);
591
592      const subscription_response_helper *  sub_info = subscription_manager.get_subscription(-1);
593      REQUIRE(sub_info == NULL);
594
595      sub_info = subscription_manager.get_subscription(-1 * status_vector[0]);
596      REQUIRE(sub_info != NULL);
597      REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]);
598      
599    }
600
601    SECTION("Delete requests for non-existent subscription requests"){
602      std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
603     
604      subscription_manager.clear();
605      REQUIRE(subscription_manager.get_request_status(0) == -1);
606      REQUIRE(subscription_manager.is_subscription_entry(0) == false);
607      REQUIRE(subscription_manager.is_request_entry(0) == false);
608      
609      int num_sources = 10;
610      
611      std::vector<std::thread> source_list;   
612      std::vector<int> status_vector(num_sources, 0);
613      srand(100);
614      for(int i = 0; i < num_sources; i++){
615        int req_id = rand()%1000;
616        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id ,  std::ref(mock_tx), 0));
617      }
618      
619      
620      for(int i = 0; i < num_sources; i++){
621        source_list[i].join();
622        REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING);
623      }
624      
625      
626    }
627
628    
629
630    SECTION("Delete requests that have succeeeded"){
631      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
632     
633      subscription_manager.clear();
634     
635      int num_sources = 10;
636      int num_sinks = 5;
637      const subscription_response_helper * sub_resp_info;
638      
639      std::vector<std::thread> source_list;
640      std::vector<std::thread> sink_list;
641      
642      std::vector<int> status_vector(num_sources, 0);
643      
644      // start up the sinks
645      is_running = true;
646      for(int i = 0; i < num_sinks; i++){
647        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
648      }
649
650      // First do subscriptions ...
651      for(int i = 0; i < num_sources; i++){
652        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
653      }
654      
655      
656      for(int i = 0; i < num_sources; i++){
657        source_list[i].join();
658        REQUIRE(status_vector[i] < 0 );
659        REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
660        sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
661        REQUIRE(sub_resp_info != NULL);
662        REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
663        
664      }
665
666      REQUIRE(subscription_manager.num_complete() == num_sources);
667      REQUIRE(subscription_manager.num_pending() == 0);
668
669
670      // Store ids ..
671      std::vector<int> completed_requests = status_vector;
672      
673      // Delete successes
674      source_list.clear();
675      for(int i = 0; i < num_sources; i++){
676        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i],  std::ref(mock_tx), 0));
677      }
678      
679      
680      for(int i = 0; i < num_sources; i++){
681        source_list[i].join();
682        REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
683      }
684
685      REQUIRE(subscription_manager.num_pending() == 0);
686
687
688      // stop the sinks
689      is_running =false;
690      for(int i = 0; i < num_sinks; i++){
691        sink_list[i].join();
692      }
693      REQUIRE(subscription_manager.num_complete() == 0);     
694      REQUIRE(subscription_manager.num_pending() == 0);
695   
696    }
697
698    SECTION("Deletes that fail"){
699
700      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl;
701     
702      subscription_manager.clear();
703     
704      int num_sources = 10;
705      int num_sinks = 5;
706      const subscription_response_helper * sub_resp_info;
707      
708      std::vector<std::thread> source_list;
709      std::vector<std::thread> sink_list;   
710      std::vector<int> status_vector(num_sources, 0);
711      
712      // start up the sinks
713      is_running = true;
714      for(int i = 0; i < num_sinks; i++){
715        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
716      }
717
718      // First do subscriptions ...
719      for(int i = 0; i < num_sources; i++){
720        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
721      }
722      
723      
724      for(int i = 0; i < num_sources; i++){
725        source_list[i].join();
726        REQUIRE(status_vector[i] < 0 );
727        REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
728        sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
729        REQUIRE(sub_resp_info != NULL);
730        REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
731        
732      }
733
734      REQUIRE(subscription_manager.num_complete() == num_sources);
735      REQUIRE(subscription_manager.num_pending() == 0);
736
737
738      // Store ids ..
739      std::vector<int> completed_requests = status_vector;
740     
741      // Delete failures
742      source_list.clear();
743      for(int i = 0; i < num_sources; i++){
744        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i],  std::ref(mock_tx), 1));
745      }
746
747      for(int i = 0; i < num_sources; i++){
748        source_list[i].join();
749        REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL);
750      }
751
752
753      // stop the sinks
754      is_running = false;
755      for(int i = 0; i < num_sinks; i++){
756        sink_list[i].join();
757      }
758      
759      // subscriptions are still there
760      REQUIRE(subscription_manager.num_complete() == num_sources);
761      REQUIRE(subscription_manager.num_pending() == 0);
762
763      
764    }
765
766    SECTION("Deletes that timed out "){
767
768      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT TIMEOUT " << std::endl;
769      
770      subscription_manager.clear();
771     
772      int num_sources = 10;
773      int num_sinks = 5;
774      const subscription_response_helper * sub_resp_info;
775      
776      std::vector<std::thread> source_list;
777      std::vector<std::thread> sink_list;
778      
779      std::vector<int> status_vector(num_sources, 0);
780      
781      // start up the sinks
782      is_running = true;
783      for(int i = 0; i < num_sinks; i++){
784        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
785      }
786
787      // First do subscriptions ...
788      for(int i = 0; i < num_sources; i++){
789        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
790      }
791      
792      
793      for(int i = 0; i < num_sources; i++){
794        source_list[i].join();
795        REQUIRE(status_vector[i] <  0 );
796        REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
797        sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
798        REQUIRE(sub_resp_info != NULL);
799        REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
800        
801      }
802
803      REQUIRE(subscription_manager.num_complete() == num_sources);
804      REQUIRE(subscription_manager.num_pending() == 0);
805
806
807      // Store ids ..
808      std::vector<int> completed_requests = status_vector;
809           
810      // Delete with  time-outs 
811      source_list.clear();
812      for(int i = 0; i < num_sources; i++){
813        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i],  std::ref(mock_silent), 0));
814      }
815      
816      
817      for(int i = 0; i < num_sources; i++){
818        source_list[i].join();
819        REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
820      }
821
822      // stop the sinks
823      is_running = false;
824      for(int i = 0; i < num_sinks; i++){
825        sink_list[i].join();
826      }
827     
828      REQUIRE(subscription_manager.num_complete() == num_sources);
829      REQUIRE(subscription_manager.num_pending() == 0);
830
831    }
832    
833    SECTION("Spurious messages"){
834      std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl;
835    
836      // In this section, we basically inject
837      // spurious messages to subscription handler.
838      // There are no outcomes. basically
839      // handler should be able to ignore these messages
840
841      int num_packets = 50;
842      int num_sinks = 10;
843      std::vector<std::thread> sink_list;
844      
845      subscription_manager.clear();
846      std::cout <<"Message queue size prior to fill  = " << message_bus.size() << std::endl;
847      random_tx(num_packets);
848      std::cout <<"Message queue size post  fill  = " << message_bus.size() << std::endl;
849
850
851      // start up the sinks
852      is_running = true;
853      for(int i = 0; i < num_sinks; i++){
854        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
855      }
856
857      // wait for queue to drain out
858      while(! message_bus.empty()){
859        sleep(2);
860      }
861      
862      // stop the sinks
863      is_running =false;
864      for(int i = 0; i < num_sinks; i++){
865        sink_list[i].join();
866      }
867      REQUIRE(subscription_manager.num_complete() == 0);     
868      REQUIRE(subscription_manager.num_pending() == 0);
869
870      std::cout <<"Message queue size at end  = " << message_bus.size() << std::endl;
871
872    }
873    
874 };
875
876    
877