Adding initial code jy.oak@samsung.com
[ric-app/kpimon.git] / src / xapp_utils.cc
diff --git a/src/xapp_utils.cc b/src/xapp_utils.cc
new file mode 100755 (executable)
index 0000000..74d1bfd
--- /dev/null
@@ -0,0 +1,307 @@
+/*\r
+==================================================================================\r
+\r
+        Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+       http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#include "xapp_utils.hpp"\r
+\r
+// Constructor that automatically determines number of threads\r
+XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){\r
+\r
+  _num_threads = std::thread::hardware_concurrency();\r
+  strcpy(_xapp_name, xapp_name);\r
+  strcpy(_proto_port, proto_port);\r
+  _redis_port = redis_port;\r
+  init(_msg_size);\r
+  get_routes();\r
+  redisInit();\r
+};\r
+  \r
+\r
+\r
+// Constructor that takes number of threads as argument \r
+XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2),  _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) {\r
+\r
+  strcpy(_xapp_name, xapp_name);\r
+  strcpy(_proto_port, proto_port);\r
+  _redis_port = redis_port;\r
+  init(_msg_size);\r
+  get_routes();\r
+  redisInit();\r
+};\r
+  \r
+// Destructor closes rmr context if available\r
+XaPP::~XaPP(void){\r
+\r
+  // Call stop to clear thread table\r
+  Stop();\r
+\r
+  // free memory\r
+  rmr_free_msg(_rmr_tx_message);\r
+  \r
+  if (_rmr_ctx){\r
+    rmr_close(_rmr_ctx);\r
+  }\r
+  // delete mutex\r
+  delete _transmit;\r
+  \r
+  /* Disconnects and frees the context */\r
+  redisFree(c);\r
+};\r
+\r
+// Get the RMR context \r
+void XaPP::init(int msg_size){\r
+\r
+  if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){\r
+    std::stringstream ss;\r
+    ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size  " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;\r
+    ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;\r
+    mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
+    throw ss.str();\r
+  }\r
+  \r
+  // Initialze the rmr context\r
+  if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){\r
+    // throw exception here ..\r
+    std::stringstream ss;\r
+    ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl;\r
+    mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
+    throw ss.str();\r
+  }\r
+  \r
+}\r
+\r
+// Get the Redis Context\r
+void XaPP::redisInit(){\r
+\r
+    // Initialize the Redis Context\r
+    c = redisConnect("127.0.0.1", _redis_port);\r
+    if (c == NULL || c->err) {\r
+        std::stringstream ss;\r
+        if (c) {\r
+            ss << "Error: " << c->errstr;\r
+            // handle error\r
+        } else {\r
+            ss <<"Can't allocate redis context";\r
+        }\r
+        mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
+        throw ss.str();\r
+    }\r
+}\r
+\r
+\r
+// Blocks till routing table is received\r
+void XaPP::get_routes(void){\r
+  \r
+  // Wait for getting a routing table ...\r
+  int i = 0;\r
+  _is_ready = 0;\r
+  while( i < MAX_WAIT_TIME){\r
+    std::cout <<"Waiting for RMR to be ready " << std::endl;\r
+    if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){\r
+      break;\r
+    }\r
+    sleep(1);\r
+    i++;\r
+  };\r
+\r
+  if(!_is_ready){\r
+    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+    std::string error_string = identifier + " Error getting routing table";\r
+    throw std::runtime_error(error_string);\r
+  }\r
+  \r
+\r
+  // Get a tx buffer in case we need to do a transmit from the main thread itself\r
+  if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){\r
+    // throw exception here ..\r
+    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+    std::string error_string = identifier + " Error getting a send buffer";\r
+    throw std::runtime_error(error_string);\r
+  }\r
+\r
+  std::cout <<"Route Table received. Send buffer allocated" << std::endl;\r
+  _transmit = new std::mutex();\r
+\r
+}\r
+\r
+// Send method that takes TLV (type/length/value) input\r
+bool XaPP::Send(int type,  int payload_len, void *payload){\r
+\r
+  if (likely(_is_ready)){\r
+    if (likely(payload_len <= RMR_BUFFER_SIZE)){\r
+      _rmr_tx_message->mtype  = type;\r
+      memcpy(_rmr_tx_message->payload, payload, payload_len);\r
+      _rmr_tx_message->len = payload_len;\r
+      return Send(_rmr_tx_message);\r
+    }\r
+    else{\r
+       std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+       std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE);\r
+       mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+    }\r
+  }\r
+  else{\r
+    std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+    std::string error_string = identifier + " rmr not ready to send";\r
+    mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+  }\r
+  \r
+  return false;\r
+}\r
+\r
+// Send method that takes TLV (type/length/value) input + MEID\r
+bool XaPP::Send(int type,  int payload_len, void *payload, unsigned char const * meid){\r
+  if (!_is_ready){\r
+    return false;\r
+  }\r
+\r
+  _rmr_tx_message->mtype  = type;  \r
+  memcpy(_rmr_tx_message->payload, payload, payload_len);\r
+  _rmr_tx_message->len = payload_len;\r
+  rmr_str2meid(_rmr_tx_message, meid);\r
+  return Send(_rmr_tx_message);\r
+  \r
+}\r
+\r
+\r
+// Send method that takes a buffer\r
+bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){\r
+\r
+  if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE  && rmr_tx_message->len > 0)){\r
+    int i = 0;\r
+    rmr_tx_message->sub_id = RMR_VOID_SUBID;\r
+    while(i <= _num_retries){\r
+      \r
+      //rmr_tx_message->state = 0; // fix for nng\r
+      rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);\r
+      _num_attempts ++;\r
+      \r
+      if (! rmr_tx_message){\r
+        // CRITICAL EROR .. log it \r
+        std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
+        std::string error_string = identifier + " rmr_send returned NULL";\r
+        mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+       return false;\r
+      }\r
+      else if (rmr_tx_message->state == RMR_OK){\r
+       return true;\r
+      }\r
+      else  if(rmr_tx_message->state == RMR_ERR_RETRY){\r
+       i++;\r
+       _num_fails++;\r
+      }\r
+      else {\r
+       mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));\r
+       return false;\r
+      }\r
+    \r
+    }\r
+  }\r
+  else{\r
+    mdclog_write(MDCLOG_ERR, "Error: %s, %d  Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);\r
+    return false;\r
+  }\r
+  \r
+  std::stringstream ss;\r
+  ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);\r
+  mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
+  \r
+  return false;\r
+}\r
+\r
+\r
+void XaPP::Stop(void){\r
+  // Get the mutex lock \r
+  std::lock_guard<std::mutex> guard(*_transmit);\r
+  _listen = false;\r
+\r
+  //Wait for all threads to stop\r
+  for(auto & t: thread_group){\r
+    std::thread::id my_id = t.second.get_id();\r
+    std::stringstream thread_id;\r
+    thread_id << my_id;\r
+    t.second.join();\r
+\r
+  }\r
+\r
+  // Clear thread table ...\r
+  thread_group.clear();\r
+  \r
+}\r
+\r
+// default error handler if non specified by user\r
+// pass through for now\r
+void XaPP::_error_handler(rmr_mbuf_t *message){\r
+};\r
+\r
+\r
+\r
+\r
+//----------------------------------------\r
+// Some get/set methods\r
+//---------------------------------------\r
+\r
+std::string XaPP::getName(void){\r
+  return std::string(_xapp_name);\r
+}\r
+\r
+int XaPP::getStatus(void){\r
+  return _is_ready;\r
+}\r
+\r
+bool XaPP::_isRunning(void){\r
+  return _listen;\r
+}\r
+\r
+\r
+void * XaPP::get_rmr_context(void){\r
+  return _rmr_ctx;\r
+}\r
+\r
+void XaPP::set_num_retries(int num_retries){\r
+  if (num_retries < 0 || num_retries > MAX_RETRIES){\r
+    throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n";\r
+  }\r
+  \r
+  _num_retries = num_retries;\r
+}\r
+\r
+int XaPP::get_num_retries(void){\r
+  return _num_retries;\r
+}\r
+\r
+\r
+unsigned long  XaPP::get_Send_attempts(void){\r
+  return _num_attempts;\r
+};\r
+\r
+\r
+unsigned long XaPP::get_Send_fails(void){\r
+  return _num_fails;\r
+};\r
+\r
+// Initialization of Log level\r
+void init_logger(const char  *AppName, mdclog_severity_t log_level)\r
+{\r
+    mdclog_attr_t *attr;\r
+    mdclog_attr_init(&attr);\r
+    mdclog_attr_set_ident(attr, AppName);\r
+    mdclog_init(attr);\r
+    mdclog_level_set(log_level);\r
+    mdclog_attr_destroy(attr);\r
+}\r