User story RICPLT-2620
[ric-app/admin.git] / test / unit_test_subscription_flow.cc
1 /*==================================================================================
2
3         Copyright (c) 2018-2019 AT&T Intellectual Property.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 /* Author : Ashwin Sridharan
20    Date   : Feb 2019
21 */
22
23
24 /* 
25    Unit testing of subscription handler
26 */
27
28 #define CATCH_CONFIG_MAIN
29 #include <catch2/catch.hpp>
30
31
32 #include <string.h>
33 #include <stdio.h>
34 #include <iostream>
35 #include <csignal>
36 #include <chrono>
37 #include <subscription_handler.hpp>
38 #include <e2sm.hpp>
39 #include <queue>
40 #include <mutex>
41 #include <rmr/RIC_message_types.h>
42 #include <thread>
43
44
45 // globally list gnodeb-id we use
46 std::string gNodeBID = "abc123";
47
48 // global queue for testing
49 // acts like a channel 
50 std::queue<std::pair<std::string, std::string>> message_bus;
51
52 // global lock for testing
53 std::mutex get_object ;
54
55 bool is_running = true;
56
57
58 // ==================================================
59 // various mock transmission functions that simulate underlying
60 // transmission layer behaviour
61
62 // this function immediately fails
63 // simulates a channel that is not available
64 bool mock_fail(int mtype,  size_t len,  void * payload, std::string gNodeB_id, int mode){
65   return false;
66 }
67
68 // silently returns without actually doing any transmission
69 // simulates a lost transmission
70 bool mock_silent(int mtype,  size_t len, void * payload, std::string gNodeB_id, int mode){
71   return true;
72 }
73
74 // simulates a working transmission channel 
75 bool mock_tx(int mytpe,  size_t len,  void *payload, std::string gNodeB_id, int mode){
76
77   bool res;
78   int i;
79   subscription_helper he;
80   subscription_response_helper he_resp;
81   
82   subscription_request sub_req;
83   subscription_response sub_resp;
84
85   subscription_delete sub_del_req;
86   subscription_delete_response sub_del_resp;
87   asn_dec_rval_t retval;
88
89   E2N_E2AP_PDU_t * e2ap_pdu_recv;
90   unsigned char buffer[256];
91   size_t buf_size = 256;
92   bool msg_ok = false;
93   
94   e2ap_pdu_recv = 0;
95   retval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_pdu_recv), payload, len);
96   if(retval.code != RC_OK){
97     std::cerr <<"Error decoding E2N_E2AP Subscription response PDU. Reason = " << strerror(errno) << std::endl;
98     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
99     return false;
100   }
101   
102   int procedure_code = e2ap_pdu_recv->choice.initiatingMessage->procedureCode;
103   
104   if(procedure_code == E2N_ProcedureCode_id_ricSubscription){
105
106     he.clear();
107     sub_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he);
108     
109  
110     // set up response object
111     he_resp.set_request(he.get_request_id(), he.get_req_seq());
112     he_resp.set_function_id(he.get_function_id());
113     i = 0;
114
115     // we simply copy over actions to both admitted and not
116     // admitted list for now ..
117     // in future, may need to be more selective
118     for(auto &e : *(he.get_list())){
119         he_resp.add_action(e.get_id());
120         he_resp.add_action(e.get_id(), 1, 2);
121       i++;
122     }
123     
124     if(mode == 0){
125       res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size,  he_resp, true);
126       if (!res){
127         std::cerr << "Error encoding subscription response successful. Reason = " << sub_resp.get_error() << std::endl;
128       }
129       else{
130         msg_ok = true;
131       }
132     }
133     else{
134       res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size, he_resp, false);
135       if (!res){
136         std::cerr << "Error encoding subscription response failure . Reason = " << sub_resp.get_error() << std::endl;
137       }
138       else{
139         msg_ok = true;
140       }
141
142     };
143   
144   }
145
146   else if (procedure_code == E2N_ProcedureCode_id_ricSubscriptionDelete){
147
148     he.clear();
149     sub_del_req.get_fields(e2ap_pdu_recv->choice.initiatingMessage, he);
150     
151     // set up response object
152     he_resp.clear();
153     he_resp.set_request(he.get_request_id(), he.get_req_seq());
154     he_resp.set_function_id(he.get_function_id());
155     if(mode == 0){
156       res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, true);
157       if (!res){
158         std::cerr << "Error encoding subscription delete  response failure . Reason = " << sub_resp.get_error() << std::endl;
159       }
160       else{
161         msg_ok = true;
162       }
163     }
164     else{
165       res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, false);
166       if (!res){
167         std::cerr << "Error encoding subscription delete  response failure . Reason = " << sub_resp.get_error() << std::endl;
168       }
169       else{
170         msg_ok = true;
171         std::cout <<"Sending delete failures ..." << std::endl;
172       }
173       
174     }
175   }
176   else{
177     std::cout <<"Illegal request" << std::endl;
178   }
179
180   
181   // push to queue
182   if(msg_ok){
183     std::lock_guard<std::mutex> guard(get_object);
184     std::string msg((char *)buffer, buf_size);
185     //std::cout <<"Pushed to queue" << std::endl;
186     message_bus.push(std::make_pair(gNodeB_id, msg));
187   }
188
189   
190   ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
191   if(msg_ok)
192     return true;  
193   else
194     return false;
195     
196     
197 }
198
199
200
201
202 // simulates response :takes what is in the queue, processes it and then invokes
203 // subscription_handler response 
204 void  mock_RAN (subscription_handler &_ref_sub_handler, int delay){
205
206   // Behaviour :
207   std::string gNodeB_id;
208   unsigned char incorrect_e2ap[128];
209   size_t incorrect_e2ap_size = 128;
210   for(int i = 0; i < 128; i++){
211     incorrect_e2ap[i] = 'b';
212   }
213   
214   FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r");
215   if(pfile == NULL){
216     std::cout <<  "Error opening e2ap_indication_test.per" << std::endl;
217     exit(-1);
218   }
219   unsigned char e2ap_msg[512];
220   size_t e2ap_msg_size = fread((char *)e2ap_msg, sizeof(char), 512, pfile);
221   fclose(pfile);
222
223   unsigned char message_buf[512];
224   std::pair<std::string, std::string> pdu;
225
226   bool is_resp;
227   while(is_running){
228     
229     // test illegal  packet response : send some random data, i.e incorrect E2AP
230     _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size, gNodeBID.c_str());
231     //std::cout <<"Sent random data to subscription handler" << std::endl;
232     
233     //test incorrect packet response :  send an E2AP which is not subscription request
234     _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size, gNodeBID.c_str());
235     //std::cout <<"Sent incorrect e2ap  to subscription handler" << std::endl;
236     
237     // now look in the queue, pop it and send the data
238     // finally send correct payload if queue not empty
239     is_resp = false;
240     {
241       std::lock_guard<std::mutex> guard(get_object);
242       if(! message_bus.empty()){
243         pdu  = message_bus.front();
244         gNodeB_id = pdu.first;
245         memcpy(message_buf,  pdu.second.c_str(), pdu.second.length());
246         message_bus.pop();
247         is_resp =true;
248       }
249     }
250
251     if(is_resp){
252       sleep(delay);
253       _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.second.length(), gNodeB_id.c_str());
254       //std::cout <<"Sent  response to subscription handler" << std::endl;
255     }
256          
257   }
258   
259 }
260
261 // wrapper function that we use to test sending subscriptions with various channels
262 void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, std::vector<std::string> & gNodeBs, int index, bool (*tx)(int,  size_t, void *,  std::string, int), int mode ){
263
264   subscription_helper subscription_info;
265   subscription_request sub_req;
266   subscription_response_helper subscription_response_info;
267
268   int function_id = 0;
269   int action_id = 4;
270   int action_type = 0;
271
272   int request_id = 1;
273   int req_seq = 1;
274   int message_type = 1;
275   int procedure_code = 27;
276   std::string egnb_id = "Testgnb";
277   std::string plmn_id = "Testplmn";
278   
279   unsigned char event_buf[128];
280   size_t event_buf_len = 128;
281   int res;    
282
283
284   e2sm_event_trigger_helper trigger_data;
285   e2sm_event_trigger event_trigger;
286   
287   trigger_data.egNB_id = egnb_id;
288   trigger_data.plmn_id = plmn_id;
289   trigger_data.egNB_id_type = 2;
290   trigger_data.interface_direction = 1;
291   trigger_data.procedure_code = procedure_code;
292   trigger_data.message_type = message_type;
293
294   res = event_trigger.encode_event_trigger(&event_buf[0], &event_buf_len, trigger_data);
295   
296   subscription_info.clear();
297   subscription_info.set_request(request_id, req_seq);
298   subscription_info.set_function_id(function_id);
299   subscription_info.add_action(action_id, action_type);
300   subscription_info.set_event_def(&event_buf[0], event_buf_len);
301
302   auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index],  mode);
303   res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBs[index], RIC_SUB_REQ, transmitter);
304   
305   if (res == SUBSCR_SUCCESS ){
306     status_vector[index] = -1 ;
307   }
308   else{
309     status_vector[index] = res;
310   }
311
312   std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl;
313 }
314
315
316 // wrapper function that we use to test sending delete requests with various channels
317 void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector,  std::vector<std::string> & gNodeBs, int index, int request_id, bool ( *tx)(int,  size_t, void *, std::string, int), int mode ){
318
319   subscription_helper subscription_info;
320   subscription_response_helper subscription_response_info;
321
322  
323   //verify subscription deleted
324   subscription_info.set_request(0, 0);
325   subscription_info.set_function_id(0);
326
327   auto transmitter = std::bind(tx,  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index], mode);
328   status_vector[index] = subscription_manager.request_subscription_delete(subscription_info, subscription_response_info, gNodeBs[index], RIC_SUB_DEL_REQ,  transmitter);
329   
330          
331 };
332
333
334 TEST_CASE("Test various channel responses", "E2AP Subscription"){
335
336   subscription_handler subscription_manager;
337     
338   mdclog_attr_t *attr;
339   mdclog_attr_init(&attr);
340   mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
341   mdclog_init(attr);
342   mdclog_level_set(MDCLOG_INFO);
343   mdclog_attr_destroy(attr);
344
345   //====================================  
346   SECTION("Verify behaviour if no listener "){
347     std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl;
348     
349     int num_sources = 10;
350     std::vector<int> status_vector(num_sources, 0);
351     subscription_manager.clear();
352     
353     std::vector<std::thread> source_list;
354     std::vector<std::string> gNodeBs;
355     for(int i = 0; i < num_sources; i++){
356       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
357     }
358     
359     for(int i = 0; i < num_sources; i++){
360       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_fail), 0));
361     }
362     
363     for(int i = 0; i < num_sources; i++){
364       source_list[i].join();
365       REQUIRE(status_vector[i] == SUBSCR_ERR_TX);
366     }
367     
368     REQUIRE(subscription_manager.num_complete() == 0);
369     REQUIRE(subscription_manager.num_pending() == 0);
370   }
371
372   SECTION("Verify behaviour if listener does not respond"){
373     std::cout <<"+++++++++" << std::endl << "TEST WHEN LISTENER DOES NOT RESPOND " << std::endl;
374     
375     int num_sources = 10;
376     std::vector<int> status_vector(num_sources, 0);
377
378     subscription_manager.clear();
379
380     std::vector<std::thread> source_list;
381         std::vector<std::string> gNodeBs;
382     for(int i = 0; i < num_sources; i++){
383       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
384     }
385
386     for(int i = 0; i < num_sources; i++){
387       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_silent), 0));
388     }
389     
390     for(int i = 0; i < num_sources; i++){
391       source_list[i].join();
392     }
393
394     for(int i = 0; i < num_sources; i++){
395       REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
396     }
397
398     REQUIRE(subscription_manager.num_complete() == 0);
399     REQUIRE(subscription_manager.num_pending() == 0);
400     
401   }
402   
403 }
404
405
406 TEST_CASE("Test config", "E2AP Subscription"){
407
408   subscription_handler subscription_manager;
409     
410   mdclog_attr_t *attr;
411   mdclog_attr_init(&attr);
412   mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
413   mdclog_init(attr);
414   mdclog_level_set(MDCLOG_INFO);
415   mdclog_attr_destroy(attr);
416
417   SECTION("Verify timeout behaviour if listener does not response"){
418     std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR  " << std::endl;
419     
420     int res;
421     int num_sources = 1;
422     int timeout_val = 2;
423     int num_tries = 1;
424     std::vector<int> status_vector(num_sources, 0);
425
426     std::vector<std::string> gNodeBs;
427     for(int i = 0; i < num_sources; i++){
428       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
429     }
430
431     subscription_manager.clear();
432     subscription_manager.set_timeout(timeout_val);
433     subscription_manager.set_num_retries(num_tries);
434     
435     auto start = std::chrono::steady_clock::now();
436     send_request(subscription_manager, status_vector, gNodeBs, 0, mock_silent, 0);
437     auto end = std::chrono::steady_clock::now();
438
439     auto diff = end - start;
440     
441     REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT);
442     REQUIRE(diff.count() >= num_tries * timeout_val);
443
444     num_tries = 2;
445     subscription_manager.set_num_retries(num_tries);
446     status_vector.clear();
447
448     start = std::chrono::steady_clock::now();
449     send_request(subscription_manager, status_vector, gNodeBs,  0, mock_silent, 0);
450     end = std::chrono::steady_clock::now();
451
452     diff = end - start;
453     
454     REQUIRE(status_vector[0] == SUBSCR_ERR_TIMEOUT);
455     REQUIRE(diff.count() >= num_tries * timeout_val);
456
457   }
458
459 }
460
461 TEST_CASE("Test sunny day scenarios", "E2AP Subscription"){
462
463   subscription_handler subscription_manager;
464     
465   mdclog_attr_t *attr;
466   mdclog_attr_init(&attr);
467   mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
468   mdclog_init(attr);
469   mdclog_level_set(MDCLOG_INFO);
470   mdclog_attr_destroy(attr);
471
472   SECTION("Verify subscription request/response success"){
473     std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;    
474     
475     subscription_manager.clear();
476     
477     int num_sources = 10;
478     int num_sinks = 5;
479     
480     std::vector<std::thread> source_list;
481     std::vector<std::thread> sink_list;
482     
483     std::vector<int> status_vector(num_sources, 0);
484     
485     // First Test null cases in queries,i.e non-existing request 
486     subscription_identifier id = std::make_tuple (gNodeBID, 10);
487      REQUIRE(subscription_manager.is_subscription_entry(id) == false);
488      id = std::make_tuple(gNodeBID, 1);
489      REQUIRE(subscription_manager.is_request_entry(id) == false);
490      REQUIRE(subscription_manager.get_request_status(id) == -1);
491      
492      // start up the sinks
493      is_running = true;
494      for(int i = 0; i < num_sinks; i++){
495        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
496      }
497
498      // generate the gnodeb list for which we are subscribing
499      // default ran_function_id is zero 
500      std::vector<std::string> gNodeBs;
501      for(int i = 0; i < num_sources; i++){
502        gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
503      }
504      
505      for(int i = 0; i < num_sources; i++){
506        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_tx), 0));
507      }
508      
509      
510      for(int i = 0; i < num_sources; i++){
511        source_list[i].join();
512        REQUIRE(status_vector[i] < 0);
513        id = std::make_tuple(gNodeBs[i], 0);
514        REQUIRE(subscription_manager.is_subscription_entry(id) == true);
515      }
516      
517      // stop the sinks
518      is_running =false;
519      for(int i = 0; i < num_sinks; i++){
520        sink_list[i].join();
521      }
522      
523      REQUIRE(subscription_manager.num_complete() == num_sources);
524      REQUIRE(subscription_manager.num_pending() == 0);
525      
526      // test getting subscription :
527      // case 1: fake request
528      id = std::make_tuple(gNodeBID, 0);
529      const subscription_response_helper *  sub_info = subscription_manager.get_subscription(id);
530      REQUIRE(sub_info == NULL);
531
532      // case 2: valid request : get all the keys and use them
533      std::vector<subscription_identifier> key_list;
534      subscription_manager.get_subscription_keys(key_list);
535      REQUIRE(key_list.size() == subscription_manager.num_complete());
536      for(auto &e: key_list){
537        sub_info = subscription_manager.get_subscription(e);
538        REQUIRE(sub_info != NULL);
539      }
540    }
541
542
543
544   SECTION("Delete requests that have succeeeded"){
545     std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
546     
547     subscription_manager.clear();
548     
549     int num_sources = 10;
550     int num_sinks = 5;
551     const subscription_response_helper * sub_resp_info;
552     
553     std::vector<std::thread> source_list;
554     std::vector<std::thread> sink_list;
555     
556     std::vector<int> status_vector(num_sources, 0);
557     std::vector<std::string> gNodeBs;
558     
559     // generate the gnodeb list for which we are subscribing
560     // default ran_function_id is zero 
561     for(int i = 0; i < num_sources; i++){
562       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
563     }
564     
565     // start up the sinks
566     is_running = true;
567     for(int i = 0; i < num_sinks; i++){
568       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
569     }
570     
571     // First do subscriptions ...
572     for(int i = 0; i < num_sources; i++){
573       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_tx), 0));
574     }
575     
576     
577     for(int i = 0; i < num_sources; i++){
578       source_list[i].join();
579       REQUIRE(status_vector[i] < 0 );
580       subscription_identifier id = std::make_tuple (gNodeBs[i], 0);       
581       REQUIRE(subscription_manager.is_subscription_entry(id) == true);
582       sub_resp_info = subscription_manager.get_subscription(id);
583       REQUIRE(sub_resp_info != NULL);
584       REQUIRE(sub_resp_info->get_request_id() == 0);
585       
586     }
587     
588     REQUIRE(subscription_manager.num_complete() == num_sources);
589     REQUIRE(subscription_manager.num_pending() == 0);
590     
591     
592     // Store ids ..
593     std::vector<int> completed_requests = status_vector;
594     
595     // Delete successes
596     source_list.clear();
597     for(int i = 0; i < num_sources; i++){
598       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i],  std::ref(mock_tx), 0));
599     }
600     
601     
602     for(int i = 0; i < num_sources; i++){
603       source_list[i].join();
604       REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
605     }
606     
607     REQUIRE(subscription_manager.num_pending() == 0);
608     
609     
610     // stop the sinks
611     is_running =false;
612     for(int i = 0; i < num_sinks; i++){
613       sink_list[i].join();
614     }
615     
616     REQUIRE(subscription_manager.num_complete() == 0);     
617     REQUIRE(subscription_manager.num_pending() == 0);
618     
619   }
620
621   
622 }
623
624 TEST_CASE("Test rainy day scenarios", "E2AP Subscription"){
625
626   subscription_handler subscription_manager;
627     
628   mdclog_attr_t *attr;
629   mdclog_attr_init(&attr);
630   mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
631   mdclog_init(attr);
632   mdclog_level_set(MDCLOG_INFO);
633   mdclog_attr_destroy(attr);
634
635   SECTION("Verify rejection of illegal pdus"){
636
637     std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl;
638     subscription_helper subscription_info;
639     subscription_response_helper subscription_response_info;
640
641     subscription_manager.clear();
642     int res;
643     int function_id = 60000;
644     int action_id = 4;
645     int action_type = 0;
646     
647     int request_id = 1;
648     int req_seq = 1;
649
650     unsigned char event_buf[] = "Hello world";
651     size_t event_buf_len = strlen((const char *)event_buf);
652     
653     subscription_info.clear();
654     subscription_info.set_request(request_id, req_seq);
655     subscription_info.set_function_id(function_id);
656     subscription_info.add_action(action_id, action_type);
657     subscription_info.set_event_def(&event_buf[0], event_buf_len);
658
659     std::vector<std::string> gNodeBs;
660     auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBID, 0);
661     res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBID, RIC_SUB_REQ, transmitter);
662     REQUIRE(res == SUBSCR_ERR_ENCODE);
663
664   }
665   
666   SECTION("Verify subscription request/response fail"){
667     std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION FAILURE " << std::endl;
668     
669     subscription_manager.clear();
670     
671     int num_sources = 20;
672     int num_sinks = 5;
673     
674     std::vector<std::thread> source_list;
675     std::vector<std::thread> sink_list;
676
677     std::vector<int> status_vector(num_sources, 0);
678     
679     // generate the gnodeb list for which we are subscribing
680     // default ran_function_id is zero 
681     std::vector<std::string> gNodeBs;
682     for(int i = 0; i < num_sources; i++){
683       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
684     }
685
686     // start up the sinks
687     is_running = true;
688     for(int i = 0; i < num_sinks; i++){
689       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
690     }
691     
692     for(int i = 0; i < num_sources; i++){
693       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), -1));
694     }
695
696     
697     for(int i = 0; i < num_sources; i++){
698       source_list[i].join();
699       REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL);
700     }
701
702     // stop the sinks
703     is_running =false;
704     for(int i = 0; i < num_sinks; i++){
705       sink_list[i].join();
706     }
707       
708     REQUIRE(subscription_manager.num_complete() == 0);
709     REQUIRE(subscription_manager.num_pending() == 0);
710
711   }
712
713   SECTION("Delete requests for non-existent subscription requests"){
714     std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
715     
716     subscription_manager.clear();
717     subscription_identifier id = std::make_tuple (gNodeBID, 0); 
718     REQUIRE(subscription_manager.get_request_status(id) == -1);
719     REQUIRE(subscription_manager.is_subscription_entry(id) == false);
720     REQUIRE(subscription_manager.is_request_entry(id) == false);
721     
722      int num_sources = 10;
723      
724      std::vector<std::thread> source_list;   
725      std::vector<int> status_vector(num_sources, 0);
726      srand(100);
727      std::vector<std::string> gNodeBs;
728
729      // generate the gnodeb list for which we are subscribing
730      // default ran_function_id is zero 
731      for(int i = 0; i < num_sources; i++){
732        gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
733      }
734
735      for(int i = 0; i < num_sources; i++){
736        int req_id = rand()%1000;
737        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i,req_id ,  std::ref(mock_tx), 0));
738      }
739      
740      
741      for(int i = 0; i < num_sources; i++){
742        source_list[i].join();
743        REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING);
744      }
745      
746    }
747
748
749    
750    SECTION("Deletes that fail"){
751
752      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl;
753     
754      subscription_manager.clear();
755     
756      int num_sources = 10;
757      int num_sinks = 5;
758      const subscription_response_helper * sub_resp_info;
759      
760      std::vector<std::thread> source_list;
761      std::vector<std::thread> sink_list;   
762      std::vector<int> status_vector(num_sources, 0);
763      
764      // start up the sinks
765      is_running = true;
766      for(int i = 0; i < num_sinks; i++){
767        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
768      }
769
770      // generate the gnodeb list for which we are subscribing
771      // default ran_function_id is zero 
772      std::vector<std::string> gNodeBs;
773      for(int i = 0; i < num_sources; i++){
774        gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
775      }
776
777      // First do subscriptions ...
778      for(int i = 0; i < num_sources; i++){
779        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
780      }
781      
782      
783      for(int i = 0; i < num_sources; i++){
784        source_list[i].join();
785        REQUIRE(status_vector[i] < 0 );
786        subscription_identifier id = std::make_tuple(gNodeBs[i], 0); 
787        REQUIRE(subscription_manager.is_subscription_entry(id) == true);
788        sub_resp_info = subscription_manager.get_subscription(id);
789        REQUIRE(sub_resp_info != NULL);
790        REQUIRE(sub_resp_info->get_request_id() == 0);
791        
792      }
793
794      REQUIRE(subscription_manager.num_complete() == num_sources);
795      REQUIRE(subscription_manager.num_pending() == 0);
796
797
798      // Store status results 
799      std::vector<int> completed_requests = status_vector;
800     
801      // Delete failures : mock_tx set to respond with failure
802      source_list.clear();
803      for(int i = 0; i < num_sources; i++){
804        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i],  std::ref(mock_tx), 1));
805      }
806
807      for(int i = 0; i < num_sources; i++){
808        source_list[i].join();
809        REQUIRE(status_vector[i] == SUBSCR_ERR_FAIL);
810      }
811
812
813      // stop the sinks
814      is_running = false;
815      for(int i = 0; i < num_sinks; i++){
816        sink_list[i].join();
817      }
818      
819      // subscriptions are still there (did not get deleted)
820      REQUIRE(subscription_manager.num_complete() == num_sources);
821      REQUIRE(subscription_manager.num_pending() == 0);
822
823      
824    }
825
826    SECTION("Deletes that timed out "){
827
828      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT TIMEOUT " << std::endl;
829      
830      subscription_manager.clear();
831     
832      int num_sources = 10;
833      int num_sinks = 5;
834      const subscription_response_helper * sub_resp_info;
835      
836      std::vector<std::thread> source_list;
837      std::vector<std::thread> sink_list;
838      
839      std::vector<int> status_vector(num_sources, 0);
840      std::vector<std::string> gNodeBs;
841
842      // generate the gnodeb list for which we are subscribing
843      // default ran_function_id is zero 
844      for(int i = 0; i < num_sources; i++){
845        gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
846      }
847      
848      // start up the sinks
849      is_running = true;
850      for(int i = 0; i < num_sinks; i++){
851        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
852      }
853
854      // First do subscriptions ...
855      for(int i = 0; i < num_sources; i++){
856        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
857      }
858      
859      
860      for(int i = 0; i < num_sources; i++){
861        source_list[i].join();
862        REQUIRE(status_vector[i] <  0 );
863        subscription_identifier id = std::make_tuple(gNodeBs[i], 0); 
864        REQUIRE(subscription_manager.is_subscription_entry(id) == true);
865        sub_resp_info = subscription_manager.get_subscription(id);
866        REQUIRE(sub_resp_info != NULL);
867        REQUIRE(sub_resp_info->get_request_id() == 0);
868        
869      }
870
871      REQUIRE(subscription_manager.num_complete() == num_sources);
872      REQUIRE(subscription_manager.num_pending() == 0);
873
874
875      // Store ids ..
876      std::vector<int> completed_requests = status_vector;
877           
878      // Delete with  time-outs 
879      source_list.clear();
880      for(int i = 0; i < num_sources; i++){
881        source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i],  std::ref(mock_silent), 0));
882      }
883      
884      
885      for(int i = 0; i < num_sources; i++){
886        source_list[i].join();
887        REQUIRE(status_vector[i] == SUBSCR_ERR_TIMEOUT);
888      }
889
890      // stop the sinks
891      is_running = false;
892      for(int i = 0; i < num_sinks; i++){
893        sink_list[i].join();
894      }
895     
896      REQUIRE(subscription_manager.num_complete() == num_sources);
897      REQUIRE(subscription_manager.num_pending() == 0);
898
899    }
900    
901
902    
903   SECTION("Verify timeout behaviour if transmitter sends after delay"){
904     std::cout <<"+++++++++" << std::endl << "TEST DELAYED ARRIVAL OF SUBSCRIPTIONS " << std::endl;    
905     
906     subscription_manager.clear();
907     int num_sources = 10;
908     int num_sinks = 5;
909     
910     std::vector<std::thread> source_list;
911     std::vector<std::thread> sink_list;
912     
913     std::vector<int> status_vector(num_sources, 0);
914
915     // set subscription manager timeout on short fuse
916     int time_out = 1;
917     int num_tries = 1;
918     subscription_manager.set_timeout(time_out);
919     subscription_manager.set_num_retries(num_tries);
920
921     // start up the sinks with delayed response
922     is_running = true;
923     int delay = 5;
924     for(int i = 0; i < num_sinks; i++){
925       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), delay));
926     }
927
928     
929     // generate the gnodeb list for which we are subscribing
930     // default ran_function_id is zero 
931     std::vector<std::string> gNodeBs;
932     for(int i = 0; i < num_sources; i++){
933       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
934     }
935      
936     for(int i = 0; i < num_sources; i++){
937       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_tx), 0));
938     }
939      
940      
941     for(int i = 0; i < num_sources; i++){
942       source_list[i].join();
943       REQUIRE(status_vector[i]  == SUBSCR_ERR_TIMEOUT);
944     }
945     
946     // stop the sinks
947     is_running =false;
948     for(int i = 0; i < num_sinks; i++){
949       sink_list[i].join();
950     }
951     
952     REQUIRE(subscription_manager.num_complete() == 0);
953     REQUIRE(subscription_manager.num_pending() == 0);
954      
955   }
956   
957    SECTION("Duplicate requests"){
958      std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION REQUESTS " << std::endl;
959      
960      subscription_manager.clear();
961      
962      int num_sources = 20;
963      int num_sinks = 5;
964      
965      std::vector<std::thread> source_list;
966      std::vector<std::thread> sink_list;
967      
968      std::vector<int> status_vector(num_sources, 0);
969      
970      // generate IDENTICAL  gnodeb list for which we are subscribing
971      // default ran_function_id is zero 
972      std::vector<std::string> gNodeBs;
973      for(int i = 0; i < num_sources; i++){
974        gNodeBs.push_back("test-gnodeb-" + std::to_string(0));
975      }
976
977      // start up the sinks
978      is_running = true;
979      for(int i = 0; i < num_sinks; i++){
980        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
981      }
982      
983      // send out subscriptions 
984      for(int i = 0; i < num_sources; i++){
985        source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
986      }
987
988      // exactly ONE subscription should succeed. all others should fail with SUBSCR_ERR_DUPLICATE
989      for(int i = 0; i < num_sources; i++){
990        source_list[i].join();
991        REQUIRE( (status_vector[i] == -1  || status_vector[i] == SUBSCR_ERR_DUPLICATE));
992        
993      }
994      
995      // stop the sinks
996      is_running =false;
997      for(int i = 0; i < num_sinks; i++){
998        sink_list[i].join();
999      }
1000      
1001      REQUIRE(subscription_manager.num_complete() == 1);     
1002    }
1003
1004
1005    SECTION("Duplicate responses"){
1006      // this scenario can happen if there was an initial successful
1007      // subscription with <gnodeb-id, ran-function-id> request
1008      // followed by another one. The response for the second one should
1009      // result in a duplicate subscription error
1010
1011
1012      std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION RESPONSES" << std::endl;
1013      
1014      subscription_manager.clear();
1015
1016      int num_sources = 1;
1017      int num_sinks = 1;
1018      std::vector<int> status_vector (num_sources, 0);
1019      std::vector<std::string> gNodeBs;
1020      gNodeBs.push_back("test-gnodeb");
1021      
1022      std::vector<std::thread> sink_list;
1023      // start up the sinks
1024      is_running = true;
1025      for(int i = 0; i < num_sinks; i++){
1026        sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
1027      }
1028
1029      // send a subscription  : this should succeed
1030      send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0);
1031      REQUIRE(status_vector[0] == -1);
1032      REQUIRE(subscription_manager.num_pending() == 0);
1033      REQUIRE(subscription_manager.num_complete() == 1);
1034
1035
1036      // now send same subscription again
1037      send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0);
1038      REQUIRE(status_vector[0] == SUBSCR_ERR_DUPLICATE);
1039      REQUIRE(subscription_manager.num_pending() == 0);
1040      REQUIRE(subscription_manager.num_complete() == 1);
1041      
1042      // stop the sinks
1043      is_running =false;
1044      for(int i = 0; i < num_sinks; i++){
1045        sink_list[i].join();
1046      }
1047      
1048    }
1049      
1050 }
1051
1052    
1053