User story RICPLT-2620
[ric-app/admin.git] / src / E2AP-c / subscription / subscription_handler.hpp
index fcfd77f..0dd78a8 100644 (file)
 #ifndef SUBSCRIPTION_HANDLER
 #define SUBSCRIPTION_HANDLER
 
+#include <functional>
 #include <mdclog/mdclog.h>
 #include <mutex>
 #include <condition_variable>
 #include <unordered_map>
 #include <chrono>
+#include <tuple>
 
 #include <subscription_request.hpp>
 #include <subscription_response.hpp>
@@ -58,39 +60,73 @@ typedef enum {
     request_duplicate
 }Subscription_Status_Types;
 
+using subscription_identifier = std::tuple<std::string , int>;
 
 /* Class to process subscription related messages 
-   each subscription request is assigned a unique internally
-generated request id for tracking purposes. this is because
-the e2 subscription request does not carry any gnodeb id information
+   The class provides mechanism to send and process
+   subscriptions and subscription deletes. 
+
+   NOTE 1: It is currently unclear how an xAPP should identify a
+   subscription request/response pair uniquely in the absence/presence of
+   subscription manager.  Ideally, the subscription manager should be
+   transperent to the xAPP but that may not be the case, i.e the
+   subscription manager may the subscription request id fields. 
+   The xAPP needs to identify uniquely not just the subscription response, but
+   also when it needs to send a delete for the corresponding request.
+   From that perspective, the fields present in both the subscription response and 
+   a delete request are the RICrequestId and RANfunctionId. However, the subscription manager
+   may require that the RICrequestId fields be set to a specific value (TBD).  Hence
+   for current purposes, a RIC subscription request is uniquely identified by the
+   tuple <gNodeB-ID, RANfunctionId>.  This is not ideal, since potentially the same RANfunctionID
+   may be subscribed to in different modes, but for now this is the constraint.
+
+
+   NOTE 2: There is discussion on tracking subscription request/repsonse using the RMR transaction ID.
+   However, a conscious choice made with the subscription_handler is that it be agnostic to the transmission
+   medium(RMR) for purposes of design isolation. Consequently, the subscription handler is not aware of any RMR
+   related semantics, but simply accepts a function to send the request/delete request that accepts a signature
+   Type, Length, Value . This also means in its current design, we cannot use transaction id to track request/response. 
+  
+
+   NOTE 3: The subscription handler is thread-safe, i.e multiple elements
+   can request subscriptions/subscription deletes from multiple threads. However
+   this does not preclude conflict if multiple threads are trying to make
+   subscriptions based on the same triplet (in which cases, results will be internally
+   consistent, but may yield errors to calling agent).
 
 */
 
-class subscription_handler {
+struct subscription_hasher {
+  size_t operator()(const subscription_identifier & key) const {
+    return  std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
+  }
+};
 
+class subscription_handler {
+                           
 public:
-  subscription_handler(void);
-  subscription_handler(unsigned int, unsigned int);
+
+  subscription_handler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
   
   void init(void);
+
   template <typename Transmitter>
-  int RequestSubscription(subscription_helper &,  subscription_response_helper &, int , Transmitter &&);
+  int request_subscription(subscription_helper &,  subscription_response_helper &, std::string, int ,   Transmitter &&);
 
   template<typename Transmitter>
-  int RequestSubscriptionDelete(subscription_helper  &, subscription_response_helper &, int , Transmitter &&);
-
-
-  void  Response(int, unsigned char *, int);
-  int const get_request_status(int);
-  subscription_response_helper * const get_subscription(int);
+  int request_subscription_delete(subscription_helper  &, subscription_response_helper &, std::string,  int ,  Transmitter &&);
 
+  void  Response(int, unsigned char *, int, const char *);
+  int const get_request_status(subscription_identifier);
+  subscription_response_helper * const get_subscription(subscription_identifier);
+  
   unsigned int get_next_id(void);
   void set_timeout(unsigned int);
   void set_num_retries(unsigned int);
   
-  bool is_subscription_entry(int); 
-  bool is_request_entry(int);
-
+  bool is_subscription_entry(subscription_identifier); 
+  bool is_request_entry(subscription_identifier);
+  void get_subscription_keys(std::vector<subscription_identifier> &);
   void clear(void);
   size_t  num_pending(void) const;
   size_t  num_complete(void) const ;
@@ -99,16 +135,17 @@ public:
   
 private:
 
-  bool add_request_entry(int, int);
-  bool set_request_status(int, int);
-  bool delete_request_entry(int);
+  
+  bool add_request_entry(subscription_identifier, int);
+  bool set_request_status(subscription_identifier, int);
+  bool delete_request_entry(subscription_identifier);
  
-  bool get_subscription_entry(int, int);
-  bool add_subscription_entry(int, subscription_response_helper &he);
-  bool delete_subscription_entry(int);
+  bool get_subscription_entry(subscription_identifier);
+  bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
+  bool delete_subscription_entry(subscription_identifier);
   
-  std::unordered_map<int, int> requests_table;
-  std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions
+  std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
+  std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
   
   std::unique_ptr<std::mutex> _data_lock;
   std::unique_ptr<std::condition_variable> _cv;
@@ -120,19 +157,19 @@ private:
 };
 
 template <typename Transmitter>
-int subscription_handler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
+int subscription_handler::request_subscription(subscription_helper &he, subscription_response_helper &response, std::string node_id, int TxCode, Transmitter && tx){
   
   int res;
   unsigned char buffer[512];
   size_t buf_len = 512;
 
-  // get a new unique request id ...
-  unsigned int new_req_id = get_next_id();
-  mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);
-  he.set_request(new_req_id, he.get_req_seq());
-  
-
+  // As per current design, request id and request sequence number
+  // must be set to zero ...  
+  he.set_request(0, 0);  
   subscription_request e2ap_sub_req;
+
+  // generate subscription identifier
+  subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
   
   // generate the request pdu
   res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, he);
@@ -144,10 +181,10 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript
   // put entry in request table
   {
     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
-    res = add_request_entry(he.get_request_id(), request_pending);
+    res = add_request_entry(sub_id, request_pending);
     if(! res){
-      mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %d to queue",  __FILE__, __LINE__, he.get_request_id());
-      return SUBSCR_ERR_UNKNOWN;
+      mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present",  __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      return SUBSCR_ERR_DUPLICATE;
     }
   }
 
@@ -159,8 +196,8 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript
   res = tx(TxCode,  buf_len, buffer);
   if (!res){
     // clear state
-    delete_request_entry(he.get_request_id());
-    mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %d", __FILE__, __LINE__,  he.get_request_id());
+    delete_request_entry(sub_id);
+    mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
     return SUBSCR_ERR_TX;
   };
 
@@ -170,80 +207,84 @@ int subscription_handler::RequestSubscription(subscription_helper &he, subscript
   res = SUBSCR_ERR_UNKNOWN;
   
   while(1){
-
-
     // release lock and wait to be woken up
     _cv.get()->wait_for(_local_lock, _time_out);
     
     // we have woken and acquired data_lock 
     // check status and return appropriate object
     
-    int status = get_request_status(he.get_request_id());
+    int status = get_request_status(sub_id);
     
     if (status == request_success){
-
       // retreive  & store the subscription response 
-      response = subscription_responses[he.get_request_id()];
-      mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());
+      response = subscription_responses[sub_id];
+      mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
       res = SUBSCR_SUCCESS;
       break;
     }
     
     if (status == request_pending){
-
       // woken up spuriously or timed out 
       auto end = std::chrono::system_clock::now();
       std::chrono::duration<double> f = end - start;
       
       if ( f > _num_retries * _time_out){
-       mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());
+       mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
        res = SUBSCR_ERR_TIMEOUT;
-       std::cout <<"Set res = " << res << " for " << he.get_request_id() << std::endl;
        break;
       }
       else{
-       mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());     
+       mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));     
        continue;
       }
     }
 
     if(status == request_failed){
-      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %d  got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
+      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
       res = SUBSCR_ERR_FAIL;
       break;
     }
 
+    if (status == request_duplicate){
+      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      res = SUBSCR_ERR_DUPLICATE;
+      break;
+
+    }
+    
     // if we are here, some spurious
-    // status obtained or request failed . we return false
-    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
+    // status obtained or request failed . we return appropriate error code 
+    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
     res = SUBSCR_ERR_UNKNOWN;
     break;
   };
 
-  delete_request_entry(he.get_request_id());
+  delete_request_entry(sub_id);
   
   // release data lock
   _local_lock.unlock();
-  std::cout <<"Returning  res = " << res << " for " << he.get_request_id() << std::endl;  
+  std::cout <<"Returning  res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," <<  std::get<1>(sub_id) << std::endl;  
   return res;
 };
 
 
 template <typename Transmitter>
-int  subscription_handler::RequestSubscriptionDelete(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){
+int  subscription_handler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id,  int TxCode, Transmitter && tx){
   
   int res;
-
+  // generate subscription identifier 
+  subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
+  
   // First check if we have this subscription
-  if(! is_subscription_entry(he.get_request_id())){
-    mdclog_write(MDCLOG_ERR, "subscription with id %d  does not exist. Cannot be deleted",  he.get_request_id());      
+  if(! is_subscription_entry(sub_id)){
+    mdclog_write(MDCLOG_ERR, "subscription with id %s, %d  does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));       
     return SUBSCR_ERR_MISSING;
   }  
   
   // Also check if such a request is queued
-  if (is_request_entry(he.get_request_id())){
-    mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %d  already in queue",  he.get_request_id());
-    return SUBSCR_ERR_UNKNOWN;
+  if (is_request_entry(sub_id)){
+    mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %s, %d  already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+    return SUBSCR_ERR_DUPLICATE;
   }
 
   subscription_delete e2ap_sub_req_del;
@@ -259,10 +300,13 @@ int  subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su
   }
   
   // put entry in request table
-  res = add_request_entry(he.get_request_id(), delete_request_pending);
-  if(! res){
-    mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());
-    return SUBSCR_ERR_UNKNOWN;
+  {
+    std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+    res = add_request_entry(sub_id, delete_request_pending);
+    if(!res){
+      mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate  subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
+      return SUBSCR_ERR_DUPLICATE;
+    }
   }
   
   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
@@ -271,8 +315,8 @@ int  subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su
   res = tx(TxCode,  buf_len, buffer);
 
   if (!res){
-    delete_request_entry(he.get_request_id());
-    mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %d", he.get_request_id());
+    delete_request_entry(sub_id);
+    mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
     return SUBSCR_ERR_TX;
   };
 
@@ -287,9 +331,9 @@ int  subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su
     _cv.get()->wait_for(_local_lock, _time_out);
     
     // check status and return appropriate object
-    int status = get_request_status(he.get_request_id());
+    int status = get_request_status(sub_id);
     if (status == delete_request_success){
-      mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %d", he.get_request_id());
+      mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
       res = SUBSCR_SUCCESS;
       break;
     }
@@ -300,36 +344,37 @@ int  subscription_handler::RequestSubscriptionDelete(subscription_helper &he, su
       std::chrono::duration<double> f = end - start;
       
       if (f > _num_retries * _time_out){
-       mdclog_write(MDCLOG_ERR, "Subscription delete request %d timed out waiting for response ", he.get_request_id());
+       mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
        res = SUBSCR_ERR_TIMEOUT;
        break;
       }
       else{
-       mdclog_write(MDCLOG_INFO, "Subscription delete request %d Waiting for response ....", he.get_request_id()); 
+       mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id)); 
       }
       
       continue;
     }
 
     if(status == delete_request_failed){
-      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %d  got failure response .. \n", __FILE__, __LINE__, he.get_request_id());
+      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
       res = SUBSCR_ERR_FAIL;
       break;
     }
+
     // if we are here, some spurious
     // status obtained. we return false
     
-    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of  delete request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);
+    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of  delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
     res =  SUBSCR_ERR_UNKNOWN;
     break;
 
   };
 
-  delete_request_entry(he.get_request_id());
+  delete_request_entry(sub_id);
   
   // release data lock
   _local_lock.unlock();
-  std::cout <<"Returning  res = " << res << " for " << he.get_request_id() << std::endl;
+  std::cout <<"Returning  res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;
   return res;
 };