Changes to Subscription Handler 14/4814/1
authorsjana <sj492a@att.com>
Tue, 6 Oct 2020 21:29:57 +0000 (17:29 -0400)
committersjana <sj492a@att.com>
Tue, 6 Oct 2020 21:46:36 +0000 (17:46 -0400)
Issue-ID: RICAPP-148

Signed-off-by: sjana <sj492a@att.com>
Change-Id: Ifa2a521bca16290219a6ef294bb96d1bc1f31195

src/hw_xapp_main.cc
src/xapp-mgmt/subs_mgmt.cc
src/xapp-mgmt/subs_mgmt.hpp
test/test_subs.h

index 1771797..a4a5eef 100644 (file)
@@ -60,29 +60,36 @@ int main(int argc, char *argv[]){
 
 
        //Create Subscription Handler if Xapp deals with Subscription.
-       std::unique_ptr<SubscriptionHandler> sub_handler = std::make_unique<SubscriptionHandler>();
+       //std::unique_ptr<SubscriptionHandler> sub_handler = std::make_unique<SubscriptionHandler>();
+
+       SubscriptionHandler sub_handler;
 
        //create HelloWorld Xapp Instance.
        std::unique_ptr<Xapp> hw_xapp;
        hw_xapp = std::make_unique<Xapp>(std::ref(config),std::ref(*rmr));
 
        mdclog_write(MDCLOG_INFO, "Created Hello World Xapp Instance");
-
-       sleep(1);
        //Startup E2 subscription and A1 policy
-       hw_xapp->startup(std::ref(*sub_handler));
+       hw_xapp->startup(sub_handler);
+
+       sleep(10);
 
 
        //start listener threads and register message handlers.
        int num_threads = std::stoi(config[XappSettings::SettingName::THREADS]);
        mdclog_write(MDCLOG_INFO, "Starting Listener Threads. Number of Workers = %d", num_threads);
 
-       std::unique_ptr<XappMsgHandler> mp_handler = std::make_unique<XappMsgHandler>(config[XappSettings::SettingName::XAPP_ID]);
+       std::unique_ptr<XappMsgHandler> mp_handler = std::make_unique<XappMsgHandler>(config[XappSettings::SettingName::XAPP_ID], sub_handler);
+
        hw_xapp->start_xapp_receiver(std::ref(*mp_handler));
 
        sleep(1);
 
-       //xapp->shutdown();
+
+
+    //hw_xapp->Run() //for spinning multiple receiving threads.
+
+       //hw_xapp->shutdown();
 
        while(1){
                                sleep(1);
index 58da73c..dca9412 100644 (file)
  * Author: Ashwin Shridharan, Shraboni Jana
  */
 #include "subs_mgmt.hpp"
-
+#include <thread>
 #include <errno.h>
 
-
 SubscriptionHandler::SubscriptionHandler(unsigned int timeout_seconds):_time_out(std::chrono::seconds(timeout_seconds)){
          _data_lock = std::make_unique<std::mutex>();
          _cv = std::make_unique<std::condition_variable>();
@@ -67,10 +66,10 @@ bool SubscriptionHandler::delete_request_entry(transaction_identifier id){
 
   if (search != status_table.end()){
     status_table.erase(search);
-    mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %s",id.c_str());
+    mdclog_write(MDCLOG_INFO,"Entry for Transaction ID deleted: %d",id);
     return true;
   }
-  mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %s",id.c_str());
+  mdclog_write(MDCLOG_INFO,"Entry not found in SubscriptionHandler for Transaction ID: %d",id);
 
   return false;
 };
@@ -79,48 +78,63 @@ bool SubscriptionHandler::delete_request_entry(transaction_identifier id){
 bool SubscriptionHandler::set_request_status(transaction_identifier id, transaction_status status){
 
   // change status of a request only if it exists.
-  auto search = status_table.find(id);
-  if(search != status_table.end()){
-    status_table[id] = status;
-    return true;
-  }
-
+       for(auto &it:status_table){
+                       if(strcmp(it.first.c_str(), id.c_str())==0) {
+                               it.second = status;
+                               return true;
+                       }
+               }
   return false;
-  
+
 };
 
 
-int const SubscriptionHandler::get_request_status(transaction_identifier id){
-  auto search = status_table.find(id);
-  if (search == status_table.end()){
-    return -1;
-  }
-  
-  return search->second;
+int SubscriptionHandler::get_request_status(transaction_identifier id){
+
+       for(auto it:status_table){
+               if(strcmp(it.first.c_str(), id.c_str())==0) {
+                       return it.second;
+               }
+       }
+
+
+  return -1;
 }
                                   
 
 
 bool SubscriptionHandler::is_request_entry(transaction_identifier id){
-  auto search = status_table.find(id);
-  if (search != status_table.end())
-    return true;
-  else
+       for(auto it:status_table){
+                       if(strcmp(it.first.c_str(), id.c_str())==0) {
+                               return true;
+                       }
+               }
     return false;
 }
 
+
+
+
 // Handles subscription responses
 void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id){
+       // Make This Thread sleep for 1 Second
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  {
+         std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+         mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for me id %s WAS: %d",id.c_str(),this->get_request_status(id));
 
-  bool res;
-  std::cout << "In Manage subscription" << std::endl;
+         //from the message type we can know if its a success/failure etc.
+         if(message_type==RIC_SUB_RESP)
+         this->set_request_status(id, request_success);
 
-  // wake up all waiting users ...
-  if(is_request_entry(id)){
-         std::cout << "In Manage subscription" << std::endl;
-         set_request_status(id, request_success);
-     _cv.get()->notify_all();
-  }
+         if(message_type==RIC_SUB_FAILURE)
+         this->set_request_status(id,request_failed);
+
+         mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for me id %s IS: %d",id.c_str(),this->get_request_status(id));
+
+         //this->print_subscription_status();
+   }
+  //_cv.get()->notify_all();
 
 }
 
index 4cc328a..c500827 100644 (file)
 #include <mutex>
 #include <condition_variable>
 #include <unordered_map>
+#include <algorithm>
+#include <ctime>
+#include <unistd.h>
 #include <chrono>
 #include <tuple>
-#include <string>
+#include <rmr/RIC_message_types.h>
 
-#include "../xapp-asn/e2ap/subscription_delete_request.hpp"
-#include "../xapp-asn/e2ap/subscription_delete_response.hpp"
-#include "../xapp-asn/e2ap/subscription_request.hpp"
-#include "../xapp-asn/e2ap/subscription_response.hpp"
+#include "subscription_delete_request.hpp"
+#include "subscription_delete_response.hpp"
+#include "subscription_request.hpp"
+#include "subscription_response.hpp"
 
 #define SUBSCR_SUCCESS 1
 #define SUBSCR_ERR_TX -1
@@ -87,12 +90,10 @@ typedef enum {
     request_pending = 1,
     request_success,
     request_failed,
-    delete_request_pending,
-    delete_request_success,
-    delete_request_failed,
     request_duplicate
 }Subscription_Status_Types;
 
+
 using transaction_identifier = std::string;
 using transaction_status = Subscription_Status_Types;
 
@@ -100,7 +101,7 @@ class SubscriptionHandler {
                            
 public:
 
-  SubscriptionHandler(unsigned int timeout_seconds = 10);
+  SubscriptionHandler(unsigned int timeout_seconds = 30);
   
   template <typename AppTransmitter>
   int manage_subscription_request(transaction_identifier, AppTransmitter &&);
@@ -110,13 +111,15 @@ public:
 
   void manage_subscription_response(int message_type, transaction_identifier id);
 
-  int const get_request_status(transaction_identifier);
+  int  get_request_status(transaction_identifier);
   bool set_request_status(transaction_identifier, transaction_status);
   bool is_request_entry(transaction_identifier);
   void set_timeout(unsigned int);
   void clear(void);
   void set_ignore_subs_resp(bool b){_ignore_subs_resp = b;};
 
+  void print_subscription_status(){ for(auto it:status_table){std::cout << it.first << "::" << it.second << std::endl;}};
+
 private:
   
   bool add_request_entry(transaction_identifier, transaction_status);
@@ -138,6 +141,7 @@ private:
 
 template <typename AppTransmitter>
 bool SubscriptionHandler::add_transmitter_entry(transaction_identifier id, AppTransmitter &&trans){
+         mdclog_write(MDCLOG_INFO,"Entry added for Transaction ID: %s",id.c_str());
 
   // add entry in hash table if it does not exist
   auto search = trans_table.find(id);
@@ -187,70 +191,14 @@ int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_
 
   // record time stamp ..
   auto start = std::chrono::system_clock::now();
-  res = SUBSCR_ERR_UNKNOWN;
-
-
-  while(1){
-         // release lock and wait to be woken up
-         _cv.get()->wait_for(_local_lock, _time_out);
-
-         // we have woken and acquired data_lock
-         // check status and return appropriate object
-         int status = get_request_status(rmr_trans_id);
-
-         if (status == request_success){
-                 mdclog_write(MDCLOG_INFO, "Successfully subscribed for request for trans_id %s", rmr_trans_id.c_str());
-                 res = SUBSCR_SUCCESS;
-                 break;
-         }
-
-         if (status == request_pending){
-                 // woken up spuriously or timed out
-                 auto end = std::chrono::system_clock::now();
-                 std::chrono::duration<double> f = end - start;
-
-                 if ( f > _time_out){
-
-                         mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request with transaction id %s timed out waiting for response ", __FILE__, __LINE__, rmr_trans_id.c_str());
-
-                         res = SUBSCR_ERR_TIMEOUT;
-                         //sunny side scenario. assuming subscription response is received.
-                         //res = SUBSCR_SUCCESS;
-                         break;
-                 }
-                 else{
-                                mdclog_write(MDCLOG_INFO, "Subscription request with transaction id %s Waiting for response....", rmr_trans_id.c_str());
-                                continue;
-                 }
-
-         }
-
-         if(status == request_failed){
-                 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s  got failure response .. \n", __FILE__, __LINE__, rmr_trans_id);
-                 res = SUBSCR_ERR_FAIL;
-                 break;
-         }
-
-         if (status == request_duplicate){
-                 mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request with transaction id %s is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, rmr_trans_id);
-                 res = SUBSCR_ERR_DUPLICATE;
-                 break;
-
-         }
+  std::chrono::milliseconds t_out(_time_out);
 
-         // if we are here, some spurious
-         // status obtained or request failed . we return appropriate error code
-         mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, and state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, rmr_trans_id, status);
-         res = SUBSCR_ERR_UNKNOWN;
-         break;
-  };
+  //the wait functionality has been removed.
 
-  delete_request_entry(rmr_trans_id);
 
-  // release data lock
   _local_lock.unlock();
- // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
-  return res;
 // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
+   return res;
 };
 
 #endif
index 2c33d55..d831c96 100644 (file)
@@ -60,5 +60,37 @@ TEST(SUBSCRIPTION, Request){
 
 }
 
+//create a MOck e2term
+TEST (MOCK, E2TERM){
+
+        const char* meid = "test1";
+        const char* sub_id = "sub1";
+        //Send the Subscription Response.
+        xapp_rmr_header hdr;
+        hdr.message_type = RIC_SUB_RESP;
+        clock_gettime(CLOCK_REALTIME, &(hdr.ts));
+        const char* strMsg = "Subscription Response for MEID: test1";
+        hdr.payload_length = strlen(strMsg);
+        strcpy((char*)hdr.meid, meid);
+        strcpy((char*)hdr.sid,sub_id);
+        int total_num_msgs = 2;
+        int num_attempts = 10;
+
+        std::unique_ptr<XappRmr> rmr;
+        rmr = std::make_unique<XappRmr>("4591",num_attempts);
+        rmr->xapp_rmr_init(true);
+
+        XappSettings config;
+
+        std::unique_ptr<Xapp> hw_xapp = std::make_unique<Xapp>(std::ref(config),std::ref(*rmr));
+        while(1){
+                bool res_msg = rmr->xapp_rmr_send(&hdr,(void*)strMsg);
+                if(res_msg){
+                        mdclog_write(MDCLOG_INFO, "Message Sent Successfully");
+                        break;
+                }
+                sleep(10);
+        }
+}
 
 #endif /* TEST_TEST_SUBS_H_ */