Adding initial code jy.oak@samsung.com
[ric-app/kpimon.git] / src / xapp_utils.hpp
diff --git a/src/xapp_utils.hpp b/src/xapp_utils.hpp
new file mode 100755 (executable)
index 0000000..7e18d7d
--- /dev/null
@@ -0,0 +1,263 @@
+/*\r
+==================================================================================\r
+\r
+        Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+       http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#include <iostream>\r
+#include <stdlib.h>\r
+#include <unistd.h>\r
+#include <string.h>\r
+#include <error.h>\r
+#include <thread>\r
+#include <map>\r
+#include <mutex>\r
+#include <functional>\r
+#include <sstream>\r
+#include <rmr/rmr.h>\r
+#include <rmr/RIC_message_types.h>\r
+#include <mdclog/mdclog.h>\r
+#include <hiredis/hiredis.h>\r
+\r
+\r
+#ifndef XAPP_UTIL\r
+# define XAPP_UTIL\r
+\r
+#define DEBUG 0\r
+\r
+#define XAPP_NAME_LENGTH 128\r
+#define PROTO_PORT_LENGTH 16\r
+#define MAX_RETRIES 16\r
+#define MAX_WAIT_TIME 10\r
+#define RMR_TIMEOUT 50// in mill-seconds\r
+#define RMR_BUFFER_SIZE 16384\r
+#define MAX_THREADS 8\r
+\r
+#ifdef __GNUC__\r
+#define likely(x)  __builtin_expect((x), 1)\r
+#define unlikely(x) __builtin_expect((x), 0)\r
+#else\r
+#define likely(x) (x)\r
+#define unlikely(x) (x)\r
+#endif\r
+\r
+void init_logger(const char  *AppName, mdclog_severity_t log_level);\r
+\r
+\r
+class XaPP {\r
+  \r
+ public:\r
+\r
+  XaPP(char *, char *, int, int);\r
+  XaPP(char *, char *, int, int, int);\r
+  ~XaPP(void);\r
+  XaPP(XaPP &&) = default;  // destructor freaks out with non-copyable thread otherwise ..\r
+  std::string getName(void);\r
+  int  getStatus(void);\r
+\r
+  // ideally can reduce tempate definitions to just two\r
+  // but for now leaving it open ...\r
+  \r
+  // template definition to allow a user defined\r
+  // processor to be started in multiple threads\r
+  template <typename messageProcessor>\r
+  void Start(messageProcessor &&);\r
+\r
+  // template definition to allow a user defined\r
+  // processor and  error handler if a send fails \r
+  // to be started in multiple threads\r
+  template <typename messageProcessor, typename errorHandler>\r
+  void Start(messageProcessor &&, errorHandler &&);\r
+\r
+  // Template to allow a user defined processor to start\r
+  // on a single thread each time it is invoked\r
+  template <typename messageProcessor > \r
+  unsigned int  StartThread(messageProcessor &&);\r
+\r
+  // Template to allow a user defined processor and \r
+  // error handle to start // on a single thread each time it\r
+  // is invoked\r
+  template <typename messageProcessor , typename errorHandler> \r
+  unsigned int StartThread(messageProcessor &&, errorHandler &&);\r
+\r
+  void Stop(void);\r
+  bool Send(int type,  int payload_len, void *payload);\r
+  bool Send(int type, int payload_len, void *payload, unsigned char const  *meid);\r
+  bool Send(rmr_mbuf_t * rmr_tx_message);\r
+  void * get_rmr_context(void);\r
+  void set_num_retries(int );\r
+  int  get_num_retries(void );\r
+  unsigned long get_Send_attempts(void);\r
+  unsigned long get_Send_fails(void);\r
+  \r
+private:\r
+\r
+  void init(int);\r
+  void get_routes();\r
+  void redisInit();\r
+  void _error_handler(rmr_mbuf_t *); // pass through placeholder\r
+  \r
+  template<typename messageProcessor, typename errorHandler>\r
+  void _workThread(messageProcessor &&, errorHandler &&, XaPP *);\r
+\r
+  char _xapp_name[XAPP_NAME_LENGTH];\r
+  char _proto_port[PROTO_PORT_LENGTH];\r
+  int _redis_port;\r
+\r
+  int _is_ready;\r
+  bool _listen;\r
+  int _num_retries;\r
+  int _msg_size;\r
+  unsigned int _num_threads;\r
+  unsigned long _num_attempts;\r
+  unsigned long _num_fails;\r
+\r
+  void* _rmr_ctx;\r
+  redisContext *c;\r
+  std::mutex *_transmit;\r
+  std::map <unsigned int, std::thread> thread_group;\r
+  rmr_mbuf_t * _rmr_tx_message;\r
\r
+  bool _isRunning(void);\r
+  \r
+};\r
+\r
+\r
+template <typename messageProcessor, typename errorHandler>\r
+void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){\r
+\r
+\r
+  // Get the thread id \r
+  std::thread::id my_id = std::this_thread::get_id();\r
+  std::stringstream thread_id;\r
+  thread_id << my_id;\r
+\r
+  // Stats counters \r
+  unsigned long recvs = 0;\r
+  unsigned long attempts = 0;\r
+  unsigned long fails = 0;\r
\r
+  // Get the rmr context \r
+  void *rmr_context = parent->get_rmr_context(); \r
+  if (!rmr_context){\r
+    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+    std::string error_string = identifier +  " Thread : " + thread_id.str() + " Listener cannot run : no context available";\r
+    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+    throw error_string;\r
+  }\r
+\r
+  // Get buffer specific to this thread\r
+  rmr_mbuf_t *rmr_message;\r
+  if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){\r
+    // throw exception here ..\r
+    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+    std::string reason = strerror(errno);\r
+    std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;\r
+    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+    throw error_string;\r
+  }\r
+  \r
+  int num_retries = this->get_num_retries();\r
+  \r
+  mdclog_write(MDCLOG_INFO, "Starting thread %s",  thread_id.str().c_str());\r
+  \r
+  while(parent->_isRunning()){\r
+    \r
+      rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);\r
+      //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);\r
+  \r
+    if (rmr_message && rmr_message->state == RMR_OK){\r
+      recvs++;\r
+      bool res = msg_fn(c,rmr_message);\r
+      }\r
+      \r
+    }\r
+\r
+  // Clean up \r
+  try{\r
+    rmr_free_msg(rmr_message);\r
+  }\r
+  catch(std::runtime_error &e){\r
+    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+    std::string error_string = identifier = " Error freeing RMR message ";\r
+    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+  }\r
+  \r
+  mdclog_write(MDCLOG_INFO, "Finished  thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);\r
+}\r
+\r
+\r
+template <typename messageProcessor>\r
+void XaPP::Start(messageProcessor && msg_fn){\r
+\r
+  std::lock_guard<std::mutex> guard(*_transmit);\r
+  _listen = true;\r
+\r
+  // Spin up the the workThreads ..... \r
+  for(unsigned int i = 0; i < _num_threads; i++){\r
+    thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));\r
+  }\r
+  \r
+};\r
+\r
+// template if calling function provides an error handler also\r
+template <typename messageProcessor, typename errorHandler>\r
+void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){\r
+\r
+  std::lock_guard<std::mutex> guard(*_transmit);\r
+  _listen = true;\r
+\r
+  // Spin up the the workThreads ..... \r
+  for(unsigned int i = 0; i < _num_threads; i++){\r
+    //std::cout << "Starting thread number " << i << std::endl;\r
+    thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));\r
+\r
+  }\r
+\r
+  \r
+};\r
+\r
+// Template to allow a user defined processor to start\r
+// on a specific thread \r
+template <typename messageProcessor>\r
+unsigned int XaPP::StartThread(messageProcessor && msg_fn){\r
+\r
+  std::lock_guard<std::mutex> guard(*_transmit);\r
+  _listen = true;\r
+\r
+  _num_threads++;\r
+  thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));\r
+  return _num_threads;\r
+\r
+};\r
+\r
+\r
+// Template to allow a user defined processor and error handler to start\r
+// on a specific thread \r
+template <typename messageProcessor, typename  errorHandler>\r
+unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){\r
+\r
+  std::lock_guard<std::mutex> guard(*_transmit);\r
+  _listen = true;\r
+\r
+  _num_threads++;\r
+  thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));\r
+  return _num_threads;\r
+\r
+};\r
+\r
+\r
+#endif\r