Initial commit of source directory
[ric-app/hw.git] / src / xapp-mgmt / subs_mgmt.hpp
diff --git a/src/xapp-mgmt/subs_mgmt.hpp b/src/xapp-mgmt/subs_mgmt.hpp
new file mode 100644 (file)
index 0000000..77458d3
--- /dev/null
@@ -0,0 +1,334 @@
+/*
+==================================================================================
+        Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+/*
+ * subs_mgmt.hpp
+ * Created on: 2019
+ * Author: Ashwin Shridharan, Shraboni Jana
+ */
+
+#pragma once
+
+#ifndef SUBSCRIPTION_HANDLER
+#define SUBSCRIPTION_HANDLER
+
+#include <functional>
+#include <mdclog/mdclog.h>
+#include <mutex>
+#include <condition_variable>
+#include <unordered_map>
+#include <chrono>
+#include <tuple>
+
+#include "subscription_delete_request.hpp"
+#include "subscription_delete_response.hpp"
+#include "subscription_request.hpp"
+#include "subscription_response.hpp"
+
+#define SUBSCR_SUCCESS 0
+#define SUBSCR_ERR_TX 1
+#define SUBSCR_ERR_TIMEOUT 2
+#define SUBSCR_ERR_FAIL 3
+#define SUBSCR_ERR_UNKNOWN 4
+#define SUBSCR_ERR_DUPLICATE 5
+#define SUBSCR_ERR_ENCODE  6
+#define SUBSCR_ERR_MISSING 7
+
+using namespace std;
+
+typedef enum {
+    request_pending = 1,
+    request_success,
+    request_failed,
+    delete_request_pending,
+    delete_request_success,
+    delete_request_failed,
+    request_duplicate
+}Subscription_Status_Types;
+
+using subscription_identifier = std::tuple<std::string , int>;
+
+struct subscription_hasher {
+  size_t operator()(const subscription_identifier & key) const {
+    return  std::hash<std::string>{}(std::get<0>(key) + std::to_string(std::get<1>(key)));
+  }
+};
+
+class SubscriptionHandler {
+                           
+public:
+
+       SubscriptionHandler(unsigned int timeout_seconds = 5, unsigned int num_retries = 2);
+  
+  void init(void);
+
+  template <typename Transmitter>
+  int request_subscription(std::string, int ,   Transmitter &&);
+
+  template<typename Transmitter>
+  int request_subscription_delete(subscription_helper  &, subscription_response_helper &, std::string,  int ,  Transmitter &&);
+
+  void  Response(int, unsigned char *, int, const char *);
+  int const get_request_status(subscription_identifier);
+  subscription_response_helper * const get_subscription(subscription_identifier);
+  
+  unsigned int get_next_id(void);
+  void set_timeout(unsigned int);
+  void set_num_retries(unsigned int);
+  
+  bool is_subscription_entry(subscription_identifier); 
+  bool is_request_entry(subscription_identifier);
+  void get_subscription_keys(std::vector<subscription_identifier> &);
+  void clear(void);
+  size_t  num_pending(void) const;
+  size_t  num_complete(void) const ;
+
+
+  
+private:
+
+  
+  bool add_request_entry(subscription_identifier, int);
+  bool set_request_status(subscription_identifier, int);
+  bool delete_request_entry(subscription_identifier);
+  bool get_subscription_entry(subscription_identifier);
+  bool add_subscription_entry(subscription_identifier, subscription_response_helper &he);
+  bool delete_subscription_entry(subscription_identifier);
+  
+  std::unordered_map<subscription_identifier, int, subscription_hasher> requests_table;
+  std::unordered_map<subscription_identifier, subscription_response_helper, subscription_hasher> subscription_responses; // stores list of successful subscriptions
+  
+  std::unique_ptr<std::mutex> _data_lock;
+  std::unique_ptr<std::condition_variable> _cv;
+
+  std::chrono::seconds _time_out;
+  unsigned int _num_retries = 2;
+  unsigned int unique_request_id = 0;
+  
+};
+
+template <typename Transmitter>
+int SubscriptionHandler::request_subscription(std::string node_id, int msgcode, Transmitter && tx){
+
+  // generate subscription identifier
+  subscription_identifier sub_id = std::make_tuple (node_id, 0); //0 is the function id which is hardcoded, which should come from rnib
+
+
+  bool res;
+  // put entry in request table
+  {
+    std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+    res = add_request_entry(sub_id, request_pending);
+    if(! res){
+      mdclog_write(MDCLOG_ERR, "%s, %d : Error adding new subscription request %s, %d to queue because request with identical key already present",  __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      return SUBSCR_ERR_DUPLICATE;
+    }
+  }
+
+  // acquire lock ...
+  std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+
+
+  // Send the message
+  res = tx();
+
+  if (!res){
+    // clear state
+    delete_request_entry(sub_id);
+    mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription request %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
+    return SUBSCR_ERR_TX;
+  };
+
+
+  // record time stamp ..
+  auto start = std::chrono::system_clock::now();
+  res = SUBSCR_ERR_UNKNOWN;
+
+  while(1){
+    // release lock and wait to be woken up
+    _cv.get()->wait_for(_local_lock, _time_out);
+
+    // we have woken and acquired data_lock
+    // check status and return appropriate object
+
+    int status = get_request_status(sub_id);
+
+    if (status == request_success){
+      // retreive  & store the subscription response (why?)
+      // response = subscription_responses[sub_id];
+      mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      res = SUBSCR_SUCCESS;
+      break;
+    }
+
+    if (status == request_pending){
+      // woken up spuriously or timed out
+      auto end = std::chrono::system_clock::now();
+      std::chrono::duration<double> f = end - start;
+
+      if ( f > _num_retries * _time_out){
+       mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %s, %d timed out waiting for response ", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+       res = SUBSCR_ERR_TIMEOUT;
+       break;
+      }
+      else{
+       mdclog_write(MDCLOG_INFO, "Subscription request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+       continue;
+      }
+    }
+
+    if(status == request_failed){
+      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      res = SUBSCR_ERR_FAIL;
+      break;
+    }
+
+    if (status == request_duplicate){
+      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Request %s, %d is duplicate : subscription already present in table .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      res = SUBSCR_ERR_DUPLICATE;
+      break;
+
+    }
+
+    // if we are here, some spurious
+    // status obtained or request failed . we return appropriate error code
+    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
+    res = SUBSCR_ERR_UNKNOWN;
+    break;
+  };
+
+  delete_request_entry(sub_id);
+
+  // release data lock
+  _local_lock.unlock();
+  std::cout <<"Returning  res = " << res << " for request = " << std::get<0>(sub_id).c_str() << "," <<  std::get<1>(sub_id) << std::endl;
+  return res;
+};
+
+
+/*template <typename Transmitter>
+int  SubscriptionHandler::request_subscription_delete(subscription_helper &he, subscription_response_helper &response, std::string node_id,  int TxCode, Transmitter && tx){
+
+  int res;
+  // generate subscription identifier
+  subscription_identifier sub_id = std::make_tuple (node_id, he.get_function_id());
+
+  // First check if we have this subscription
+  if(! is_subscription_entry(sub_id)){
+    mdclog_write(MDCLOG_ERR, "subscription with id %s, %d  does not exist. Cannot be deleted",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+    return SUBSCR_ERR_MISSING;
+  }
+
+  // Also check if such a request is queued
+  if (is_request_entry(sub_id)){
+    mdclog_write(MDCLOG_ERR, "Subscription delete request  with id %s, %d  already in queue",std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+    return SUBSCR_ERR_DUPLICATE;
+  }
+
+  subscription_delete e2ap_sub_req_del;
+
+  // generate the delete request pdu
+  unsigned char buffer[128];
+  size_t buf_len = 128;
+
+  res = e2ap_sub_req_del.encode_e2ap_subscription(&buffer[0], &buf_len, he);
+  if(! res){
+    mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription delete request pdu. Reason = %s", __FILE__, __LINE__, e2ap_sub_req_del.get_error().c_str());
+    return SUBSCR_ERR_ENCODE;
+  }
+
+  // put entry in request table
+  {
+    std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+    res = add_request_entry(sub_id, delete_request_pending);
+    if(!res){
+      mdclog_write(MDCLOG_ERR, "%s, %d: Duplicate  subscription delete request = %s, %d", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id) );
+      return SUBSCR_ERR_DUPLICATE;
+    }
+  }
+
+  std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+
+  // Send the message
+  res = tx(TxCode,  buf_len, buffer);
+
+  if (!res){
+    delete_request_entry(sub_id);
+    mdclog_write(MDCLOG_ERR, "Error transmitting delete subscription request %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+    return SUBSCR_ERR_TX;
+  };
+
+
+  // record time stamp ..
+  auto start = std::chrono::system_clock::now();
+
+  res = SUBSCR_ERR_UNKNOWN;
+  while(1){
+
+    // wait to be woken up
+    _cv.get()->wait_for(_local_lock, _time_out);
+
+    // check status and return appropriate object
+    int status = get_request_status(sub_id);
+    if (status == delete_request_success){
+      mdclog_write(MDCLOG_INFO, "Successfully deleted subscription id %s, %d", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      res = SUBSCR_SUCCESS;
+      break;
+    }
+
+    if (status == delete_request_pending){
+      // woken up spuriously or timed out
+      auto end = std::chrono::system_clock::now();
+      std::chrono::duration<double> f = end - start;
+
+      if (f > _num_retries * _time_out){
+       mdclog_write(MDCLOG_ERR, "Subscription delete request %s, %d timed out waiting for response ", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+       res = SUBSCR_ERR_TIMEOUT;
+       break;
+      }
+      else{
+       mdclog_write(MDCLOG_INFO, "Subscription delete request %s, %d Waiting for response ....", std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      }
+
+      continue;
+    }
+
+    if(status == delete_request_failed){
+      mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Subscription Delete Request %s, %d  got failure response .. \n", __FILE__, __LINE__, std::get<0>(sub_id).c_str(), std::get<1>(sub_id));
+      res = SUBSCR_ERR_FAIL;
+      break;
+    }
+
+    // if we are here, some spurious
+    // status obtained. we return false
+
+    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of  delete request %s, %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__,std::get<0>(sub_id).c_str(), std::get<1>(sub_id), status);
+    res =  SUBSCR_ERR_UNKNOWN;
+    break;
+
+  };
+
+  delete_request_entry(sub_id);
+
+  // release data lock
+  _local_lock.unlock();
+  std::cout <<"Returning  res = " << res << " for " << std::get<0>(sub_id) << "," << std::get<1>(sub_id) << std::endl;
+  return res;
+};*/
+
+#endif