--- /dev/null
+/*\r
+==================================================================================\r
+\r
+ Copyright (c) 2018-2019 AT&T Intellectual Property.\r
+\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+==================================================================================\r
+*/\r
+\r
+#include <iostream>\r
+#include <stdlib.h>\r
+#include <unistd.h>\r
+#include <string.h>\r
+#include <error.h>\r
+#include <thread>\r
+#include <map>\r
+#include <mutex>\r
+#include <functional>\r
+#include <sstream>\r
+#include <rmr/rmr.h>\r
+#include <rmr/RIC_message_types.h>\r
+#include <mdclog/mdclog.h>\r
+#include <hiredis/hiredis.h>\r
+\r
+\r
+#ifndef XAPP_UTIL\r
+# define XAPP_UTIL\r
+\r
+#define DEBUG 0\r
+\r
+#define XAPP_NAME_LENGTH 128\r
+#define PROTO_PORT_LENGTH 16\r
+#define MAX_RETRIES 16\r
+#define MAX_WAIT_TIME 10\r
+#define RMR_TIMEOUT 50// in mill-seconds\r
+#define RMR_BUFFER_SIZE 16384\r
+#define MAX_THREADS 8\r
+\r
+#ifdef __GNUC__\r
+#define likely(x) __builtin_expect((x), 1)\r
+#define unlikely(x) __builtin_expect((x), 0)\r
+#else\r
+#define likely(x) (x)\r
+#define unlikely(x) (x)\r
+#endif\r
+\r
+void init_logger(const char *AppName, mdclog_severity_t log_level);\r
+\r
+\r
+class XaPP {\r
+ \r
+ public:\r
+\r
+ XaPP(char *, char *, int, int);\r
+ XaPP(char *, char *, int, int, int);\r
+ ~XaPP(void);\r
+ XaPP(XaPP &&) = default; // destructor freaks out with non-copyable thread otherwise ..\r
+ std::string getName(void);\r
+ int getStatus(void);\r
+\r
+ // ideally can reduce tempate definitions to just two\r
+ // but for now leaving it open ...\r
+ \r
+ // template definition to allow a user defined\r
+ // processor to be started in multiple threads\r
+ template <typename messageProcessor>\r
+ void Start(messageProcessor &&);\r
+\r
+ // template definition to allow a user defined\r
+ // processor and error handler if a send fails \r
+ // to be started in multiple threads\r
+ template <typename messageProcessor, typename errorHandler>\r
+ void Start(messageProcessor &&, errorHandler &&);\r
+\r
+ // Template to allow a user defined processor to start\r
+ // on a single thread each time it is invoked\r
+ template <typename messageProcessor > \r
+ unsigned int StartThread(messageProcessor &&);\r
+\r
+ // Template to allow a user defined processor and \r
+ // error handle to start // on a single thread each time it\r
+ // is invoked\r
+ template <typename messageProcessor , typename errorHandler> \r
+ unsigned int StartThread(messageProcessor &&, errorHandler &&);\r
+\r
+ void Stop(void);\r
+ bool Send(int type, int payload_len, void *payload);\r
+ bool Send(int type, int payload_len, void *payload, unsigned char const *meid);\r
+ bool Send(rmr_mbuf_t * rmr_tx_message);\r
+ void * get_rmr_context(void);\r
+ void set_num_retries(int );\r
+ int get_num_retries(void );\r
+ unsigned long get_Send_attempts(void);\r
+ unsigned long get_Send_fails(void);\r
+ \r
+private:\r
+\r
+ void init(int);\r
+ void get_routes();\r
+ void redisInit();\r
+ void _error_handler(rmr_mbuf_t *); // pass through placeholder\r
+ \r
+ template<typename messageProcessor, typename errorHandler>\r
+ void _workThread(messageProcessor &&, errorHandler &&, XaPP *);\r
+\r
+ char _xapp_name[XAPP_NAME_LENGTH];\r
+ char _proto_port[PROTO_PORT_LENGTH];\r
+ int _redis_port;\r
+\r
+ int _is_ready;\r
+ bool _listen;\r
+ int _num_retries;\r
+ int _msg_size;\r
+ unsigned int _num_threads;\r
+ unsigned long _num_attempts;\r
+ unsigned long _num_fails;\r
+\r
+ void* _rmr_ctx;\r
+ redisContext *c;\r
+ std::mutex *_transmit;\r
+ std::map <unsigned int, std::thread> thread_group;\r
+ rmr_mbuf_t * _rmr_tx_message;\r
+ \r
+ bool _isRunning(void);\r
+ \r
+};\r
+\r
+\r
+template <typename messageProcessor, typename errorHandler>\r
+void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){\r
+\r
+\r
+ // Get the thread id \r
+ std::thread::id my_id = std::this_thread::get_id();\r
+ std::stringstream thread_id;\r
+ thread_id << my_id;\r
+\r
+ // Stats counters \r
+ unsigned long recvs = 0;\r
+ unsigned long attempts = 0;\r
+ unsigned long fails = 0;\r
+ \r
+ // Get the rmr context \r
+ void *rmr_context = parent->get_rmr_context(); \r
+ if (!rmr_context){\r
+ std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; \r
+ std::string error_string = identifier + " Thread : " + thread_id.str() + " Listener cannot run : no context available";\r
+ mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+ throw error_string;\r
+ }\r
+\r
+ // Get buffer specific to this thread\r
+ rmr_mbuf_t *rmr_message;\r
+ if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){\r
+ // throw exception here ..\r
+ std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; \r
+ std::string reason = strerror(errno);\r
+ std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;\r
+ mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+ throw error_string;\r
+ }\r
+ \r
+ int num_retries = this->get_num_retries();\r
+ \r
+ mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str());\r
+ \r
+ while(parent->_isRunning()){\r
+ \r
+ rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);\r
+ //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);\r
+ \r
+ if (rmr_message && rmr_message->state == RMR_OK){\r
+ recvs++;\r
+ bool res = msg_fn(c,rmr_message);\r
+ }\r
+ \r
+ }\r
+\r
+ // Clean up \r
+ try{\r
+ rmr_free_msg(rmr_message);\r
+ }\r
+ catch(std::runtime_error &e){\r
+ std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; \r
+ std::string error_string = identifier = " Error freeing RMR message ";\r
+ mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
+ }\r
+ \r
+ mdclog_write(MDCLOG_INFO, "Finished thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);\r
+}\r
+\r
+\r
+template <typename messageProcessor>\r
+void XaPP::Start(messageProcessor && msg_fn){\r
+\r
+ std::lock_guard<std::mutex> guard(*_transmit);\r
+ _listen = true;\r
+\r
+ // Spin up the the workThreads ..... \r
+ for(unsigned int i = 0; i < _num_threads; i++){\r
+ thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));\r
+ }\r
+ \r
+};\r
+\r
+// template if calling function provides an error handler also\r
+template <typename messageProcessor, typename errorHandler>\r
+void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){\r
+\r
+ std::lock_guard<std::mutex> guard(*_transmit);\r
+ _listen = true;\r
+\r
+ // Spin up the the workThreads ..... \r
+ for(unsigned int i = 0; i < _num_threads; i++){\r
+ //std::cout << "Starting thread number " << i << std::endl;\r
+ thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));\r
+\r
+ }\r
+\r
+ \r
+};\r
+\r
+// Template to allow a user defined processor to start\r
+// on a specific thread \r
+template <typename messageProcessor>\r
+unsigned int XaPP::StartThread(messageProcessor && msg_fn){\r
+\r
+ std::lock_guard<std::mutex> guard(*_transmit);\r
+ _listen = true;\r
+\r
+ _num_threads++;\r
+ thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));\r
+ return _num_threads;\r
+\r
+};\r
+\r
+\r
+// Template to allow a user defined processor and error handler to start\r
+// on a specific thread \r
+template <typename messageProcessor, typename errorHandler>\r
+unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){\r
+\r
+ std::lock_guard<std::mutex> guard(*_transmit);\r
+ _listen = true;\r
+\r
+ _num_threads++;\r
+ thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));\r
+ return _num_threads;\r
+\r
+};\r
+\r
+\r
+#endif\r