Adding initial code jy.oak@samsung.com
[ric-app/kpimon.git] / src / E2AP-c / subscription / subscription_handler.hpp
diff --git a/src/E2AP-c/subscription/subscription_handler.hpp b/src/E2AP-c/subscription/subscription_handler.hpp
new file mode 100755 (executable)
index 0000000..6d05f0e
--- /dev/null
@@ -0,0 +1,213 @@
+/*\r
+==================================================================================\r
+        Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+       http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#pragma once\r
+\r
+#ifndef SUBSCRIPTION_HANDLER\r
+#define SUBSCRIPTION_HANDLER\r
+\r
+#include <mdclog/mdclog.h>\r
+#include <mutex>\r
+#include <condition_variable>\r
+#include <unordered_map>\r
+#include <chrono>\r
+\r
+#include <subscription_request.hpp>\r
+#include <subscription_response.hpp>\r
+\r
+using namespace std;\r
+\r
+typedef enum {\r
+    request_pending = 1,\r
+    request_success,\r
+    request_failed,\r
+    request_duplicate\r
+}Subscription_Status_Types;\r
+\r
+\r
+/* Class to process subscription related messages \r
+   each subscription request is assigned a unique internally\r
+generated request id for tracking purposes. this is because\r
+the e2 subscription request does not carry any gnodeb id information\r
+\r
+*/\r
+\r
+class SubscriptionHandler {\r
+\r
+public:\r
+  SubscriptionHandler(void);\r
+  SubscriptionHandler(unsigned int, unsigned int);\r
+  \r
+  void init(void);\r
+  template <typename Transmitter>\r
+  bool RequestSubscription(subscription_helper &,  subscription_response_helper &, int , Transmitter &&);\r
+\r
+\r
+  void  Response(int, unsigned char *, int);\r
+  int const get_request_status(int);\r
+  subscription_response_helper * const get_subscription(int);\r
+\r
+  unsigned int get_next_id(void);\r
+  void set_timeout(unsigned int);\r
+  void set_timeout_flag(bool);\r
+  void set_num_retries(unsigned int);\r
+  \r
+  bool is_subscription_entry(int); \r
+  bool is_request_entry(int);\r
+\r
+  void clear(void);\r
+  size_t  num_pending(void) const;\r
+  size_t  num_complete(void) const ;\r
+\r
+\r
+  \r
+private:\r
+\r
+  void ProcessSubscriptionResponse(unsigned char *, int len);\r
+  void ProcessSubscriptionFailure(unsigned char *, int len);\r
+\r
+  bool add_request_entry(int, int);\r
+  bool set_request_status(int, int);\r
+  bool delete_request_entry(int);\r
\r
+  bool get_subscription_entry(int, int);\r
+  bool add_subscription_entry(int, subscription_response_helper &he);\r
+  bool delete_subscription_entry(int);\r
+  \r
+  std::unordered_map<int, int> requests_table;\r
+  std::unordered_map<int, subscription_response_helper> subscription_responses; // stores list of successful subscriptions\r
+  \r
+  std::unique_ptr<std::mutex> _data_lock;\r
+  std::unique_ptr<std::condition_variable> _cv;\r
+\r
+  std::chrono::seconds _time_out;\r
+  unsigned int _num_retries = 2;\r
+  bool _time_out_flag = true;\r
+  unsigned int unique_request_id = 0;\r
+  \r
+};\r
+\r
+template <typename Transmitter>\r
+bool SubscriptionHandler::RequestSubscription(subscription_helper &he, subscription_response_helper &response, int TxCode, Transmitter && tx){\r
+  \r
+  bool res;\r
+  unsigned char buffer[512];\r
+  size_t buf_len = 512;\r
+\r
+  // get a new unique request id ...\r
+  unsigned int new_req_id = get_next_id();\r
+  mdclog_write(MDCLOG_INFO, "%s, %d:: Generated new request id %d\n", __FILE__, __LINE__, new_req_id);\r
+  he.set_request(new_req_id, he.get_req_seq());\r
+  \r
+  E2AP_PDU_t *e2ap_pdu = 0;\r
+  subscription_request e2ap_sub_req;\r
+  \r
+  // generate the request pdu\r
+  res = e2ap_sub_req.encode_e2ap_subscription(&buffer[0], &buf_len, e2ap_pdu, he);\r
+  if(! res){\r
+    mdclog_write(MDCLOG_ERR, "%s, %d: Error encoding subscription pdu. Reason = ", __FILE__, __LINE__);\r
+    return false;\r
+  }\r
+  \r
+  // put entry in request table\r
+  {\r
+    std::lock_guard<std::mutex> lock(*(_data_lock.get()));\r
+    res = add_request_entry(he.get_request_id(), request_pending);\r
+    if(! res){\r
+      mdclog_write(MDCLOG_ERR, "Error adding new subscription request %d to queue",  he.get_request_id());\r
+      return false;\r
+    }\r
+  }\r
+\r
+  // acquire lock ...\r
+  std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));\r
+\r
+\r
+  // Send the message\r
+  res = tx(TxCode,  buf_len, buffer);\r
+  if (!res){\r
+    // clear state\r
+    delete_request_entry(he.get_request_id());\r
+    mdclog_write(MDCLOG_ERR, "Error transmitting subscription request %d", he.get_request_id());\r
+    return false;\r
+  };\r
+\r
+  \r
+  // record time stamp ..\r
+  auto start = std::chrono::system_clock::now();\r
+  \r
+  while(1){\r
+\r
+\r
+    // release lock and wait to be woken up\r
+    _cv.get()->wait_for(_local_lock, _time_out);\r
+    \r
+    // we have woken and acquired data_lock \r
+    // check status and return appropriate object\r
+    \r
+    int status = get_request_status(he.get_request_id());\r
+    \r
+    if (status == request_success){\r
+      // retreive the subscription response and clear queue\r
+      response = subscription_responses[he.get_request_id()];\r
+\r
+      // clear state\r
+      delete_request_entry(he.get_request_id());\r
+      \r
+      // release data lock\r
+      _local_lock.unlock();\r
+      mdclog_write(MDCLOG_INFO, "Successfully subscribed for request %d", he.get_request_id());\r
\r
+      break;\r
+    }\r
+    \r
+    if (status == request_pending){\r
+      // woken up spuriously or timed out \r
+      auto end = std::chrono::system_clock::now();\r
+      std::chrono::duration<double> f = end - start;\r
+      \r
+      if (_time_out_flag && f > _num_retries * _time_out){\r
+       delete_request_entry(he.get_request_id());\r
+       mdclog_write(MDCLOG_ERR, "%s, %d:: Subscription request %d timed out waiting for response ", __FILE__, __LINE__, he.get_request_id());\r
+\r
+       // Release data lock\r
+       _local_lock.unlock();\r
+       return false;\r
+      }\r
+      else{\r
+       mdclog_write(MDCLOG_INFO, "Subscription request %d Waiting for response ....", he.get_request_id());     \r
+       continue;\r
+      }\r
+    }\r
+\r
+    // if we are here, some spurious\r
+    // status obtained or request failed . we return false\r
+    mdclog_write(MDCLOG_ERR, "Error :: %s, %d : Spurious time out caused by invalid state of request %d -- state = %d. Deleting request entry and failing .. \n", __FILE__, __LINE__, he.get_request_id(), status);\r
+    delete_request_entry(he.get_request_id());\r
+    \r
+    // release data lock\r
+    _local_lock.unlock();\r
+\r
+    return false;\r
+    \r
+  };\r
+            \r
+  return true;\r
+};\r
+\r
+#endif\r