User story RICPLT-2620
[ric-app/admin.git] / test / unit_test_subscription_flow.cc
index 9c99f77..995188f 100644 (file)
 #include <thread>
 
 
+// globally list gnodeb-id we use
+std::string gNodeBID = "abc123";
+
 // global queue for testing
-std::queue<std::string> message_bus;
+// acts like a channel 
+std::queue<std::pair<std::string, std::string>> message_bus;
 
 // global lock for testing
 std::mutex get_object ;
 
 bool is_running = true;
 
-bool mock_fail(int mtype,  size_t len,  void * payload, int mode){
+
+// ==================================================
+// various mock transmission functions that simulate underlying
+// transmission layer behaviour
+
+// this function immediately fails
+// simulates a channel that is not available
+bool mock_fail(int mtype,  size_t len,  void * payload, std::string gNodeB_id, int mode){
   return false;
 }
 
-bool mock_silent(int mtype,  size_t len, void * payload, int mode){
+// silently returns without actually doing any transmission
+// simulates a lost transmission
+bool mock_silent(int mtype,  size_t len, void * payload, std::string gNodeB_id, int mode){
   return true;
 }
 
-
-bool mock_tx(int mytpe,  size_t len,  void *payload, int mode){
+// simulates a working transmission channel 
+bool mock_tx(int mytpe,  size_t len,  void *payload, std::string gNodeB_id, int mode){
 
   bool res;
   int i;
@@ -170,14 +183,13 @@ bool mock_tx(int mytpe,  size_t len,  void *payload, int mode){
     std::lock_guard<std::mutex> guard(get_object);
     std::string msg((char *)buffer, buf_size);
     //std::cout <<"Pushed to queue" << std::endl;
-    message_bus.push(msg);
+    message_bus.push(std::make_pair(gNodeB_id, msg));
   }
 
   
   ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_pdu_recv);
   if(msg_ok)
-    return true;
-  
+    return true;  
   else
     return false;
     
@@ -186,71 +198,19 @@ bool mock_tx(int mytpe,  size_t len,  void *payload, int mode){
 
 
 
-// Randomly generate number of subscription response and delete
-// response packets and push to queue
-void random_tx(int num_packets){
-  subscription_response_helper he_resp;
-  subscription_response sub_resp;
-  subscription_delete sub_del_req;
-  subscription_delete_response sub_del_resp;
-  bool res;
-  unsigned char buffer[256];
-  size_t buf_size = 256;
-
-  he_resp.add_action(10);
-  
-  // generate subscription responses
-  for(int i = 0; i < num_packets; i++){
-    
-    // set up response object
-    he_resp.set_request(i, 1);
-    he_resp.set_function_id(0); 
-    buf_size = 256;
-    res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size,  he_resp, true);
-    {
-        std::lock_guard<std::mutex> guard(get_object);
-       std::string msg((char *)buffer, buf_size);
-       message_bus.push(msg);
-    }
-    
-    buf_size = 256;
-    res = sub_resp.encode_e2ap_subscription_response(buffer, &buf_size,  he_resp, false);
-    {
-        std::lock_guard<std::mutex> guard(get_object);
-       std::string msg((char *)buffer, buf_size);
-       message_bus.push(msg);
-    }
-    
-    buf_size = 256;
-    res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, true);
-    {
-      std::lock_guard<std::mutex> guard(get_object);
-      std::string msg((char *)buffer, buf_size);
-      message_bus.push(msg);
-    }
-
-    buf_size = 256;
-    res = sub_del_resp.encode_e2ap_subscription_delete_response(buffer, &buf_size,  he_resp, false);
-    {
-      std::lock_guard<std::mutex> guard(get_object);
-      std::string msg((char *)buffer, buf_size);
-      message_bus.push(msg);
-    }
-    
-    
-  }
-}
 
+// simulates response :takes what is in the queue, processes it and then invokes
+// subscription_handler response 
+void  mock_RAN (subscription_handler &_ref_sub_handler, int delay){
 
-void  mock_RAN (subscription_handler &_ref_sub_handler){
   // Behaviour :
-  
+  std::string gNodeB_id;
   unsigned char incorrect_e2ap[128];
   size_t incorrect_e2ap_size = 128;
   for(int i = 0; i < 128; i++){
     incorrect_e2ap[i] = 'b';
   }
-
+  
   FILE *pfile = fopen("test-data/e2ap_indication_test.per", "r");
   if(pfile == NULL){
     std::cout <<  "Error opening e2ap_indication_test.per" << std::endl;
@@ -261,40 +221,46 @@ void  mock_RAN (subscription_handler &_ref_sub_handler){
   fclose(pfile);
 
   unsigned char message_buf[512];
-  std::string pdu;
-  
+  std::pair<std::string, std::string> pdu;
+
+  bool is_resp;
   while(is_running){
-    // send some random data, i.e incorrect E2AP
-    _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size);
+    
+    // test illegal  packet response : send some random data, i.e incorrect E2AP
+    _ref_sub_handler.Response(RIC_INDICATION, incorrect_e2ap, incorrect_e2ap_size, gNodeBID.c_str());
     //std::cout <<"Sent random data to subscription handler" << std::endl;
     
-    //  send an E2AP which is not subscription request
-    _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size);
+    //test incorrect packet response :  send an E2AP which is not subscription request
+    _ref_sub_handler.Response(RIC_INDICATION, e2ap_msg, e2ap_msg_size, gNodeBID.c_str());
     //std::cout <<"Sent incorrect e2ap  to subscription handler" << std::endl;
     
     // now look in the queue, pop it and send the data
-    // finally send correct payload
+    // finally send correct payload if queue not empty
+    is_resp = false;
     {
       std::lock_guard<std::mutex> guard(get_object);
       if(! message_bus.empty()){
        pdu  = message_bus.front();
-       memcpy(message_buf,  pdu.c_str(), pdu.length());
+       gNodeB_id = pdu.first;
+       memcpy(message_buf,  pdu.second.c_str(), pdu.second.length());
        message_bus.pop();
+       is_resp =true;
       }
     }
-    
-    _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.length());
-    //std::cout <<"Sent  response to subscription handler" << std::endl;
-      
-    
-    
-    sleep(1);
+
+    if(is_resp){
+      sleep(delay);
+      _ref_sub_handler.Response(RIC_SUB_RESP, message_buf, pdu.second.length(), gNodeB_id.c_str());
+      //std::cout <<"Sent  response to subscription handler" << std::endl;
+    }
+         
   }
   
 }
 
+// wrapper function that we use to test sending subscriptions with various channels
+void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, std::vector<std::string> & gNodeBs, int index, bool (*tx)(int,  size_t, void *,  std::string, int), int mode ){
 
-void send_request(subscription_handler &subscription_manager, std::vector<int> & status_vector, int index, bool (*tx)(int,  size_t, void *, int), int mode ){
   subscription_helper subscription_info;
   subscription_request sub_req;
   subscription_response_helper subscription_response_info;
@@ -333,12 +299,11 @@ void send_request(subscription_handler &subscription_manager, std::vector<int> &
   subscription_info.add_action(action_id, action_type);
   subscription_info.set_event_def(&event_buf[0], event_buf_len);
 
-  auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
-  res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
+  auto transmitter = std::bind(tx, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index],  mode);
+  res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBs[index], RIC_SUB_REQ, transmitter);
   
   if (res == SUBSCR_SUCCESS ){
-    // store -ve of request id 
-    status_vector[index] = -1 * subscription_info.get_request_id();
+    status_vector[index] = -1 ;
   }
   else{
     status_vector[index] = res;
@@ -347,24 +312,26 @@ void send_request(subscription_handler &subscription_manager, std::vector<int> &
   std::cout <<"Subscription = " << subscription_info.get_request_id() << " Result = " << res << std::endl;
 }
 
-void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector,  int index, int request_id, bool ( *tx)(int,  size_t, void *, int), int mode ){
+
+// wrapper function that we use to test sending delete requests with various channels
+void delete_request(subscription_handler &subscription_manager, std::vector<int> & status_vector,  std::vector<std::string> & gNodeBs, int index, int request_id, bool ( *tx)(int,  size_t, void *, std::string, int), int mode ){
 
   subscription_helper subscription_info;
   subscription_response_helper subscription_response_info;
 
  
   //verify subscription deleted
-  subscription_info.set_request(request_id, 1);
+  subscription_info.set_request(0, 0);
   subscription_info.set_function_id(0);
 
-  auto transmitter = std::bind(tx,  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, mode);
-  status_vector[index] = subscription_manager.RequestSubscriptionDelete(subscription_info, subscription_response_info, RIC_SUB_DEL_REQ, transmitter);
+  auto transmitter = std::bind(tx,  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBs[index], mode);
+  status_vector[index] = subscription_manager.request_subscription_delete(subscription_info, subscription_response_info, gNodeBs[index], RIC_SUB_DEL_REQ,  transmitter);
   
          
 };
 
 
-TEST_CASE("Test subscription work flow", "E2AP Subscription"){
+TEST_CASE("Test various channel responses", "E2AP Subscription"){
 
   subscription_handler subscription_manager;
     
@@ -372,18 +339,10 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
   mdclog_attr_init(&attr);
   mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
   mdclog_init(attr);
-  mdclog_level_set(MDCLOG_DEBUG);
+  mdclog_level_set(MDCLOG_INFO);
   mdclog_attr_destroy(attr);
 
-  unsigned char node_buffer[32];
-  std::string gNodeB = "TEST_GNOBDE";
-
-  std::copy(gNodeB.begin(), gNodeB.end(), node_buffer);
-  node_buffer[gNodeB.length()] = '\0';
-  
-
-  //====================================
-  
+  //====================================  
   SECTION("Verify behaviour if no listener "){
     std::cout <<"+++++++++" << std::endl << "TEST WITH NO LISTENER " << std::endl;
     
@@ -392,9 +351,13 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     subscription_manager.clear();
     
     std::vector<std::thread> source_list;
+    std::vector<std::string> gNodeBs;
+    for(int i = 0; i < num_sources; i++){
+      gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+    }
     
     for(int i = 0; i < num_sources; i++){
-      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_fail), 0));
+      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_fail), 0));
     }
     
     for(int i = 0; i < num_sources; i++){
@@ -415,9 +378,13 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     subscription_manager.clear();
 
     std::vector<std::thread> source_list;
-    
+        std::vector<std::string> gNodeBs;
+    for(int i = 0; i < num_sources; i++){
+      gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+    }
+
     for(int i = 0; i < num_sources; i++){
-      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_silent), 0));
+      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_silent), 0));
     }
     
     for(int i = 0; i < num_sources; i++){
@@ -433,6 +400,20 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     
   }
   
+}
+
+
+TEST_CASE("Test config", "E2AP Subscription"){
+
+  subscription_handler subscription_manager;
+    
+  mdclog_attr_t *attr;
+  mdclog_attr_init(&attr);
+  mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
+  mdclog_init(attr);
+  mdclog_level_set(MDCLOG_INFO);
+  mdclog_attr_destroy(attr);
+
   SECTION("Verify timeout behaviour if listener does not response"){
     std::cout <<"+++++++++" << std::endl << "TEST TIMEOUT BEHAVIOUR  " << std::endl;
     
@@ -441,13 +422,18 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     int timeout_val = 2;
     int num_tries = 1;
     std::vector<int> status_vector(num_sources, 0);
-     
+
+    std::vector<std::string> gNodeBs;
+    for(int i = 0; i < num_sources; i++){
+      gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+    }
+
     subscription_manager.clear();
     subscription_manager.set_timeout(timeout_val);
     subscription_manager.set_num_retries(num_tries);
     
     auto start = std::chrono::steady_clock::now();
-    send_request(subscription_manager, status_vector, 0, mock_silent, 0);
+    send_request(subscription_manager, status_vector, gNodeBs, 0, mock_silent, 0);
     auto end = std::chrono::steady_clock::now();
 
     auto diff = end - start;
@@ -460,7 +446,7 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     status_vector.clear();
 
     start = std::chrono::steady_clock::now();
-    send_request(subscription_manager, status_vector, 0, mock_silent, 0);
+    send_request(subscription_manager, status_vector, gNodeBs,  0, mock_silent, 0);
     end = std::chrono::steady_clock::now();
 
     diff = end - start;
@@ -470,7 +456,184 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
 
   }
 
+}
+
+TEST_CASE("Test sunny day scenarios", "E2AP Subscription"){
+
+  subscription_handler subscription_manager;
+    
+  mdclog_attr_t *attr;
+  mdclog_attr_init(&attr);
+  mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
+  mdclog_init(attr);
+  mdclog_level_set(MDCLOG_INFO);
+  mdclog_attr_destroy(attr);
+
+  SECTION("Verify subscription request/response success"){
+    std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;    
+    
+    subscription_manager.clear();
+    
+    int num_sources = 10;
+    int num_sinks = 5;
+    
+    std::vector<std::thread> source_list;
+    std::vector<std::thread> sink_list;
+    
+    std::vector<int> status_vector(num_sources, 0);
+    
+    // First Test null cases in queries,i.e non-existing request 
+    subscription_identifier id = std::make_tuple (gNodeBID, 10);
+     REQUIRE(subscription_manager.is_subscription_entry(id) == false);
+     id = std::make_tuple(gNodeBID, 1);
+     REQUIRE(subscription_manager.is_request_entry(id) == false);
+     REQUIRE(subscription_manager.get_request_status(id) == -1);
+     
+     // start up the sinks
+     is_running = true;
+     for(int i = 0; i < num_sinks; i++){
+       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+     }
+
+     // generate the gnodeb list for which we are subscribing
+     // default ran_function_id is zero 
+     std::vector<std::string> gNodeBs;
+     for(int i = 0; i < num_sources; i++){
+       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+     }
+     
+     for(int i = 0; i < num_sources; i++){
+       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_tx), 0));
+     }
+     
+     
+     for(int i = 0; i < num_sources; i++){
+       source_list[i].join();
+       REQUIRE(status_vector[i] < 0);
+       id = std::make_tuple(gNodeBs[i], 0);
+       REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+     }
+     
+     // stop the sinks
+     is_running =false;
+     for(int i = 0; i < num_sinks; i++){
+       sink_list[i].join();
+     }
+     
+     REQUIRE(subscription_manager.num_complete() == num_sources);
+     REQUIRE(subscription_manager.num_pending() == 0);
+     
+     // test getting subscription :
+     // case 1: fake request
+     id = std::make_tuple(gNodeBID, 0);
+     const subscription_response_helper *  sub_info = subscription_manager.get_subscription(id);
+     REQUIRE(sub_info == NULL);
+
+     // case 2: valid request : get all the keys and use them
+     std::vector<subscription_identifier> key_list;
+     subscription_manager.get_subscription_keys(key_list);
+     REQUIRE(key_list.size() == subscription_manager.num_complete());
+     for(auto &e: key_list){
+       sub_info = subscription_manager.get_subscription(e);
+       REQUIRE(sub_info != NULL);
+     }
+   }
+
+
+
+  SECTION("Delete requests that have succeeeded"){
+    std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
+    
+    subscription_manager.clear();
+    
+    int num_sources = 10;
+    int num_sinks = 5;
+    const subscription_response_helper * sub_resp_info;
+    
+    std::vector<std::thread> source_list;
+    std::vector<std::thread> sink_list;
+    
+    std::vector<int> status_vector(num_sources, 0);
+    std::vector<std::string> gNodeBs;
+    
+    // generate the gnodeb list for which we are subscribing
+    // default ran_function_id is zero 
+    for(int i = 0; i < num_sources; i++){
+      gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+    }
+    
+    // start up the sinks
+    is_running = true;
+    for(int i = 0; i < num_sinks; i++){
+      sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+    }
+    
+    // First do subscriptions ...
+    for(int i = 0; i < num_sources; i++){
+      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_tx), 0));
+    }
+    
+    
+    for(int i = 0; i < num_sources; i++){
+      source_list[i].join();
+      REQUIRE(status_vector[i] < 0 );
+      subscription_identifier id = std::make_tuple (gNodeBs[i], 0);       
+      REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+      sub_resp_info = subscription_manager.get_subscription(id);
+      REQUIRE(sub_resp_info != NULL);
+      REQUIRE(sub_resp_info->get_request_id() == 0);
+      
+    }
+    
+    REQUIRE(subscription_manager.num_complete() == num_sources);
+    REQUIRE(subscription_manager.num_pending() == 0);
+    
+    
+    // Store ids ..
+    std::vector<int> completed_requests = status_vector;
+    
+    // Delete successes
+    source_list.clear();
+    for(int i = 0; i < num_sources; i++){
+      source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i],  std::ref(mock_tx), 0));
+    }
+    
+    
+    for(int i = 0; i < num_sources; i++){
+      source_list[i].join();
+      REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
+    }
+    
+    REQUIRE(subscription_manager.num_pending() == 0);
+    
+    
+    // stop the sinks
+    is_running =false;
+    for(int i = 0; i < num_sinks; i++){
+      sink_list[i].join();
+    }
+    
+    REQUIRE(subscription_manager.num_complete() == 0);     
+    REQUIRE(subscription_manager.num_pending() == 0);
+    
+  }
+
+  
+}
+
+TEST_CASE("Test rainy day scenarios", "E2AP Subscription"){
+
+  subscription_handler subscription_manager;
+    
+  mdclog_attr_t *attr;
+  mdclog_attr_init(&attr);
+  mdclog_attr_set_ident(attr, "MOCK TEST SUBSCRIPTION WORK FLOW ");
+  mdclog_init(attr);
+  mdclog_level_set(MDCLOG_INFO);
+  mdclog_attr_destroy(attr);
+
   SECTION("Verify rejection of illegal pdus"){
+
     std::cout <<"+++++++++" << std::endl <<"TEST WITH ILLEGAL PDU PARAMS" << std::endl;
     subscription_helper subscription_info;
     subscription_response_helper subscription_response_info;
@@ -483,8 +646,6 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     
     int request_id = 1;
     int req_seq = 1;
-    int message_type = 1;
-    int procedure_code = 27;
 
     unsigned char event_buf[] = "Hello world";
     size_t event_buf_len = strlen((const char *)event_buf);
@@ -495,13 +656,11 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     subscription_info.add_action(action_id, action_type);
     subscription_info.set_event_def(&event_buf[0], event_buf_len);
 
-
-    auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, 0);
-    res = subscription_manager.RequestSubscription(subscription_info, subscription_response_info , RIC_SUB_REQ, transmitter);
+    std::vector<std::string> gNodeBs;
+    auto transmitter = std::bind(mock_silent, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, gNodeBID, 0);
+    res = subscription_manager.request_subscription(subscription_info, subscription_response_info , gNodeBID, RIC_SUB_REQ, transmitter);
     REQUIRE(res == SUBSCR_ERR_ENCODE);
 
-    
-
   }
   
   SECTION("Verify subscription request/response fail"){
@@ -516,15 +675,22 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
     std::vector<std::thread> sink_list;
 
     std::vector<int> status_vector(num_sources, 0);
+    
+    // generate the gnodeb list for which we are subscribing
+    // default ran_function_id is zero 
+    std::vector<std::string> gNodeBs;
+    for(int i = 0; i < num_sources; i++){
+      gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+    }
 
     // start up the sinks
     is_running = true;
     for(int i = 0; i < num_sinks; i++){
-      sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+      sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
     }
     
     for(int i = 0; i < num_sources; i++){
-      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), -1));
+      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), -1));
     }
 
     
@@ -544,76 +710,31 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
 
   }
 
-   SECTION("Verify subscription request/response success"){
-     std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION SUCCESS " << std::endl;
+  SECTION("Delete requests for non-existent subscription requests"){
+    std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
     
-
-     subscription_manager.clear();
+    subscription_manager.clear();
+    subscription_identifier id = std::make_tuple (gNodeBID, 0); 
+    REQUIRE(subscription_manager.get_request_status(id) == -1);
+    REQUIRE(subscription_manager.is_subscription_entry(id) == false);
+    REQUIRE(subscription_manager.is_request_entry(id) == false);
     
      int num_sources = 10;
-     int num_sinks = 5;
-     
-     std::vector<std::thread> source_list;
-     std::vector<std::thread> sink_list;
      
+     std::vector<std::thread> source_list;   
      std::vector<int> status_vector(num_sources, 0);
+     srand(100);
+     std::vector<std::string> gNodeBs;
 
-     // Test null cases in queries
-     REQUIRE(subscription_manager.is_subscription_entry(10) == false);
-     REQUIRE(subscription_manager.is_request_entry(1) == false);
-     REQUIRE(subscription_manager.get_request_status(1) == -1);
-     
-     // start up the sinks
-     is_running = true;
-     for(int i = 0; i < num_sinks; i++){
-       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
-     }
-     
-     for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
-     }
-     
-     
+     // generate the gnodeb list for which we are subscribing
+     // default ran_function_id is zero 
      for(int i = 0; i < num_sources; i++){
-       source_list[i].join();
-       REQUIRE(status_vector[i] < 0);
-       REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
-     }
-     
-     // stop the sinks
-     is_running =false;
-     for(int i = 0; i < num_sinks; i++){
-       sink_list[i].join();
+       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
      }
-     
-     REQUIRE(subscription_manager.num_complete() == num_sources);
-     REQUIRE(subscription_manager.num_pending() == 0);
-
-     const subscription_response_helper *  sub_info = subscription_manager.get_subscription(-1);
-     REQUIRE(sub_info == NULL);
-
-     sub_info = subscription_manager.get_subscription(-1 * status_vector[0]);
-     REQUIRE(sub_info != NULL);
-     REQUIRE(sub_info->get_request_id() == -1 * status_vector[0]);
-     
-   }
 
-   SECTION("Delete requests for non-existent subscription requests"){
-     std::cout <<"+++++++++" << std::endl << "TEST SUBSCRIPTION DELETE WITH NO CORRESPONDING REQUEST" << std::endl;
-    
-     subscription_manager.clear();
-     REQUIRE(subscription_manager.get_request_status(0) == -1);
-     REQUIRE(subscription_manager.is_subscription_entry(0) == false);
-     REQUIRE(subscription_manager.is_request_entry(0) == false);
-     
-     int num_sources = 10;
-     
-     std::vector<std::thread> source_list;   
-     std::vector<int> status_vector(num_sources, 0);
-     srand(100);
      for(int i = 0; i < num_sources; i++){
        int req_id = rand()%1000;
-       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i,req_id ,  std::ref(mock_tx), 0));
+       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i,req_id ,  std::ref(mock_tx), 0));
      }
      
      
@@ -622,79 +743,10 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
        REQUIRE(status_vector[i] == SUBSCR_ERR_MISSING);
      }
      
-     
    }
 
-   
-
-   SECTION("Delete requests that have succeeeded"){
-     std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES " << std::endl;
-    
-     subscription_manager.clear();
-    
-     int num_sources = 10;
-     int num_sinks = 5;
-     const subscription_response_helper * sub_resp_info;
-     
-     std::vector<std::thread> source_list;
-     std::vector<std::thread> sink_list;
-     
-     std::vector<int> status_vector(num_sources, 0);
-     
-     // start up the sinks
-     is_running = true;
-     for(int i = 0; i < num_sinks; i++){
-       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
-     }
-
-     // First do subscriptions ...
-     for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
-     }
-     
-     
-     for(int i = 0; i < num_sources; i++){
-       source_list[i].join();
-       REQUIRE(status_vector[i] < 0 );
-       REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
-       sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
-       REQUIRE(sub_resp_info != NULL);
-       REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
-       
-     }
-
-     REQUIRE(subscription_manager.num_complete() == num_sources);
-     REQUIRE(subscription_manager.num_pending() == 0);
-
-
-     // Store ids ..
-     std::vector<int> completed_requests = status_vector;
-     
-     // Delete successes
-     source_list.clear();
-     for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i],  std::ref(mock_tx), 0));
-     }
-     
-     
-     for(int i = 0; i < num_sources; i++){
-       source_list[i].join();
-       REQUIRE(status_vector[i] == SUBSCR_SUCCESS);
-     }
-
-     REQUIRE(subscription_manager.num_pending() == 0);
-
-
-     // stop the sinks
-     is_running =false;
-     for(int i = 0; i < num_sinks; i++){
-       sink_list[i].join();
-     }
-     REQUIRE(subscription_manager.num_complete() == 0);     
-     REQUIRE(subscription_manager.num_pending() == 0);
-  
-   }
 
+   
    SECTION("Deletes that fail"){
 
      std::cout <<"+++++++++" << std::endl << "TEST WITH SUBSCRIPTION DELETES THAT FAIL " << std::endl;
@@ -712,22 +764,30 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
      // start up the sinks
      is_running = true;
      for(int i = 0; i < num_sinks; i++){
-       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+     }
+
+     // generate the gnodeb list for which we are subscribing
+     // default ran_function_id is zero 
+     std::vector<std::string> gNodeBs;
+     for(int i = 0; i < num_sources; i++){
+       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
      }
 
      // First do subscriptions ...
      for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
+       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
      }
      
      
      for(int i = 0; i < num_sources; i++){
        source_list[i].join();
        REQUIRE(status_vector[i] < 0 );
-       REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
-       sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
+       subscription_identifier id = std::make_tuple(gNodeBs[i], 0); 
+       REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+       sub_resp_info = subscription_manager.get_subscription(id);
        REQUIRE(sub_resp_info != NULL);
-       REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
+       REQUIRE(sub_resp_info->get_request_id() == 0);
        
      }
 
@@ -735,13 +795,13 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
      REQUIRE(subscription_manager.num_pending() == 0);
 
 
-     // Store ids ..
+     // Store status results 
      std::vector<int> completed_requests = status_vector;
     
-     // Delete failures
+     // Delete failures : mock_tx set to respond with failure
      source_list.clear();
      for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i],  std::ref(mock_tx), 1));
+       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i],  std::ref(mock_tx), 1));
      }
 
      for(int i = 0; i < num_sources; i++){
@@ -756,7 +816,7 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
        sink_list[i].join();
      }
      
-     // subscriptions are still there
+     // subscriptions are still there (did not get deleted)
      REQUIRE(subscription_manager.num_complete() == num_sources);
      REQUIRE(subscription_manager.num_pending() == 0);
 
@@ -777,26 +837,34 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
      std::vector<std::thread> sink_list;
      
      std::vector<int> status_vector(num_sources, 0);
+     std::vector<std::string> gNodeBs;
+
+     // generate the gnodeb list for which we are subscribing
+     // default ran_function_id is zero 
+     for(int i = 0; i < num_sources; i++){
+       gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+     }
      
      // start up the sinks
      is_running = true;
      for(int i = 0; i < num_sinks; i++){
-       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
      }
 
      // First do subscriptions ...
      for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), i, std::ref(mock_tx), 0));
+       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
      }
      
      
      for(int i = 0; i < num_sources; i++){
        source_list[i].join();
        REQUIRE(status_vector[i] <  0 );
-       REQUIRE(subscription_manager.is_subscription_entry(-1 * status_vector[i]) == true);
-       sub_resp_info = subscription_manager.get_subscription(-1 * status_vector[i]);
+       subscription_identifier id = std::make_tuple(gNodeBs[i], 0); 
+       REQUIRE(subscription_manager.is_subscription_entry(id) == true);
+       sub_resp_info = subscription_manager.get_subscription(id);
        REQUIRE(sub_resp_info != NULL);
-       REQUIRE(sub_resp_info->get_request_id() == -1 * status_vector[i]);
+       REQUIRE(sub_resp_info->get_request_id() == 0);
        
      }
 
@@ -810,7 +878,7 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
      // Delete with  time-outs 
      source_list.clear();
      for(int i = 0; i < num_sources; i++){
-       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), i, -1 * completed_requests[i],  std::ref(mock_silent), 0));
+       source_list.push_back(std::thread(delete_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, -1 * completed_requests[i],  std::ref(mock_silent), 0));
      }
      
      
@@ -830,33 +898,98 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
 
    }
    
-   SECTION("Spurious messages"){
-     std::cout <<"+++++++++" << std::endl << "TEST WITH SPURIOUS RESPONSES" << std::endl;
+
    
-     // In this section, we basically inject
-     // spurious messages to subscription handler.
-     // There are no outcomes. basically
-     // handler should be able to ignore these messages
+  SECTION("Verify timeout behaviour if transmitter sends after delay"){
+    std::cout <<"+++++++++" << std::endl << "TEST DELAYED ARRIVAL OF SUBSCRIPTIONS " << std::endl;    
+    
+    subscription_manager.clear();
+    int num_sources = 10;
+    int num_sinks = 5;
+    
+    std::vector<std::thread> source_list;
+    std::vector<std::thread> sink_list;
+    
+    std::vector<int> status_vector(num_sources, 0);
 
-     int num_packets = 50;
-     int num_sinks = 10;
-     std::vector<std::thread> sink_list;
+    // set subscription manager timeout on short fuse
+    int time_out = 1;
+    int num_tries = 1;
+    subscription_manager.set_timeout(time_out);
+    subscription_manager.set_num_retries(num_tries);
+
+    // start up the sinks with delayed response
+    is_running = true;
+    int delay = 5;
+    for(int i = 0; i < num_sinks; i++){
+      sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), delay));
+    }
+
+    
+    // generate the gnodeb list for which we are subscribing
+    // default ran_function_id is zero 
+    std::vector<std::string> gNodeBs;
+    for(int i = 0; i < num_sources; i++){
+      gNodeBs.push_back("test-gnodeb-" + std::to_string(i));
+    }
+     
+    for(int i = 0; i < num_sources; i++){
+      source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs),  i, std::ref(mock_tx), 0));
+    }
+     
+     
+    for(int i = 0; i < num_sources; i++){
+      source_list[i].join();
+      REQUIRE(status_vector[i]  == SUBSCR_ERR_TIMEOUT);
+    }
+    
+    // stop the sinks
+    is_running =false;
+    for(int i = 0; i < num_sinks; i++){
+      sink_list[i].join();
+    }
+    
+    REQUIRE(subscription_manager.num_complete() == 0);
+    REQUIRE(subscription_manager.num_pending() == 0);
+     
+  }
+  
+   SECTION("Duplicate requests"){
+     std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION REQUESTS " << std::endl;
      
      subscription_manager.clear();
-     std::cout <<"Message queue size prior to fill  = " << message_bus.size() << std::endl;
-     random_tx(num_packets);
-     std::cout <<"Message queue size post  fill  = " << message_bus.size() << std::endl;
-
+     
+     int num_sources = 20;
+     int num_sinks = 5;
+     
+     std::vector<std::thread> source_list;
+     std::vector<std::thread> sink_list;
+     
+     std::vector<int> status_vector(num_sources, 0);
+     
+     // generate IDENTICAL  gnodeb list for which we are subscribing
+     // default ran_function_id is zero 
+     std::vector<std::string> gNodeBs;
+     for(int i = 0; i < num_sources; i++){
+       gNodeBs.push_back("test-gnodeb-" + std::to_string(0));
+     }
 
      // start up the sinks
      is_running = true;
      for(int i = 0; i < num_sinks; i++){
-       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager)));
+       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+     }
+     
+     // send out subscriptions 
+     for(int i = 0; i < num_sources; i++){
+       source_list.push_back(std::thread(send_request, std::ref(subscription_manager), std::ref(status_vector), std::ref(gNodeBs), i, std::ref(mock_tx), 0));
      }
 
-     // wait for queue to drain out
-     while(! message_bus.empty()){
-       sleep(2);
+     // exactly ONE subscription should succeed. all others should fail with SUBSCR_ERR_DUPLICATE
+     for(int i = 0; i < num_sources; i++){
+       source_list[i].join();
+       REQUIRE( (status_vector[i] == -1  || status_vector[i] == SUBSCR_ERR_DUPLICATE));
+       
      }
      
      // stop the sinks
@@ -864,14 +997,57 @@ TEST_CASE("Test subscription work flow", "E2AP Subscription"){
      for(int i = 0; i < num_sinks; i++){
        sink_list[i].join();
      }
-     REQUIRE(subscription_manager.num_complete() == 0);     
+     
+     REQUIRE(subscription_manager.num_complete() == 1);     
+   }
+
+
+   SECTION("Duplicate responses"){
+     // this scenario can happen if there was an initial successful
+     // subscription with <gnodeb-id, ran-function-id> request
+     // followed by another one. The response for the second one should
+     // result in a duplicate subscription error
+
+
+     std::cout <<"+++++++++" << std::endl << "TEST DUPLICATE SUBSCRIPTION RESPONSES" << std::endl;
+     
+     subscription_manager.clear();
+
+     int num_sources = 1;
+     int num_sinks = 1;
+     std::vector<int> status_vector (num_sources, 0);
+     std::vector<std::string> gNodeBs;
+     gNodeBs.push_back("test-gnodeb");
+     
+     std::vector<std::thread> sink_list;
+     // start up the sinks
+     is_running = true;
+     for(int i = 0; i < num_sinks; i++){
+       sink_list.push_back(std::thread(mock_RAN, std::ref(subscription_manager), 1));
+     }
+
+     // send a subscription  : this should succeed
+     send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0);
+     REQUIRE(status_vector[0] == -1);
      REQUIRE(subscription_manager.num_pending() == 0);
+     REQUIRE(subscription_manager.num_complete() == 1);
 
-     std::cout <<"Message queue size at end  = " << message_bus.size() << std::endl;
 
+     // now send same subscription again
+     send_request(subscription_manager, status_vector, gNodeBs, 0, mock_tx, 0);
+     REQUIRE(status_vector[0] == SUBSCR_ERR_DUPLICATE);
+     REQUIRE(subscription_manager.num_pending() == 0);
+     REQUIRE(subscription_manager.num_complete() == 1);
+     
+     // stop the sinks
+     is_running =false;
+     for(int i = 0; i < num_sinks; i++){
+       sink_list[i].join();
+     }
+     
    }
-   
-};
+     
+}