Updated INFO.yaml file
[ric-app/kpimon.git] / src / xapp_utils.hpp
1 /*\r
2 ==================================================================================\r
3 \r
4         Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property.\r
5 \r
6    Licensed under the Apache License, Version 2.0 (the "License");\r
7    you may not use this file except in compliance with the License.\r
8    You may obtain a copy of the License at\r
9 \r
10        http://www.apache.org/licenses/LICENSE-2.0\r
11 \r
12    Unless required by applicable law or agreed to in writing, software\r
13    distributed under the License is distributed on an "AS IS" BASIS,\r
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
15    See the License for the specific language governing permissions and\r
16    limitations under the License.\r
17 ==================================================================================\r
18 */\r
19 \r
20 #include <iostream>\r
21 #include <stdlib.h>\r
22 #include <unistd.h>\r
23 #include <string.h>\r
24 #include <error.h>\r
25 #include <thread>\r
26 #include <map>\r
27 #include <mutex>\r
28 #include <functional>\r
29 #include <sstream>\r
30 #include <rmr/rmr.h>\r
31 #include <rmr/RIC_message_types.h>\r
32 #include <mdclog/mdclog.h>\r
33 #include <hiredis/hiredis.h>\r
34 \r
35 \r
36 #ifndef XAPP_UTIL\r
37 # define XAPP_UTIL\r
38 \r
39 #define DEBUG 0\r
40 \r
41 #define XAPP_NAME_LENGTH 128\r
42 #define PROTO_PORT_LENGTH 16\r
43 #define MAX_RETRIES 16\r
44 #define MAX_WAIT_TIME 10\r
45 #define RMR_TIMEOUT 50// in mill-seconds\r
46 #define RMR_BUFFER_SIZE 16384\r
47 #define MAX_THREADS 8\r
48 \r
49 #ifdef __GNUC__\r
50 #define likely(x)  __builtin_expect((x), 1)\r
51 #define unlikely(x) __builtin_expect((x), 0)\r
52 #else\r
53 #define likely(x) (x)\r
54 #define unlikely(x) (x)\r
55 #endif\r
56 \r
57 void init_logger(const char  *AppName, mdclog_severity_t log_level);\r
58 \r
59 \r
60 class XaPP {\r
61   \r
62  public:\r
63 \r
64   XaPP(char *, char *, int, int);\r
65   XaPP(char *, char *, int, int, int);\r
66   ~XaPP(void);\r
67   XaPP(XaPP &&) = default;  // destructor freaks out with non-copyable thread otherwise ..\r
68   std::string getName(void);\r
69   int  getStatus(void);\r
70 \r
71   // ideally can reduce tempate definitions to just two\r
72   // but for now leaving it open ...\r
73   \r
74   // template definition to allow a user defined\r
75   // processor to be started in multiple threads\r
76   template <typename messageProcessor>\r
77   void Start(messageProcessor &&);\r
78 \r
79   // template definition to allow a user defined\r
80   // processor and  error handler if a send fails \r
81   // to be started in multiple threads\r
82   template <typename messageProcessor, typename errorHandler>\r
83   void Start(messageProcessor &&, errorHandler &&);\r
84 \r
85   // Template to allow a user defined processor to start\r
86   // on a single thread each time it is invoked\r
87   template <typename messageProcessor > \r
88   unsigned int  StartThread(messageProcessor &&);\r
89 \r
90   // Template to allow a user defined processor and \r
91   // error handle to start // on a single thread each time it\r
92   // is invoked\r
93   template <typename messageProcessor , typename errorHandler> \r
94   unsigned int StartThread(messageProcessor &&, errorHandler &&);\r
95 \r
96   void Stop(void);\r
97   bool Send(int type,  int payload_len, void *payload);\r
98   bool Send(int type, int payload_len, void *payload, unsigned char const  *meid);\r
99   bool Send(rmr_mbuf_t * rmr_tx_message);\r
100   void * get_rmr_context(void);\r
101   void set_num_retries(int );\r
102   int  get_num_retries(void );\r
103   unsigned long get_Send_attempts(void);\r
104   unsigned long get_Send_fails(void);\r
105   \r
106 private:\r
107 \r
108   void init(int);\r
109   void get_routes();\r
110   void redisInit();\r
111   void _error_handler(rmr_mbuf_t *); // pass through placeholder\r
112   \r
113   template<typename messageProcessor, typename errorHandler>\r
114   void _workThread(messageProcessor &&, errorHandler &&, XaPP *);\r
115 \r
116   char _xapp_name[XAPP_NAME_LENGTH];\r
117   char _proto_port[PROTO_PORT_LENGTH];\r
118   int _redis_port;\r
119 \r
120   int _is_ready;\r
121   bool _listen;\r
122   int _num_retries;\r
123   int _msg_size;\r
124   unsigned int _num_threads;\r
125   unsigned long _num_attempts;\r
126   unsigned long _num_fails;\r
127 \r
128   void* _rmr_ctx;\r
129   redisContext *c;\r
130   std::mutex *_transmit;\r
131   std::map <unsigned int, std::thread> thread_group;\r
132   rmr_mbuf_t * _rmr_tx_message;\r
133  \r
134   bool _isRunning(void);\r
135   \r
136 };\r
137 \r
138 \r
139 template <typename messageProcessor, typename errorHandler>\r
140 void XaPP::_workThread(messageProcessor && msg_fn, errorHandler && error_handler, XaPP *parent){\r
141 \r
142 \r
143   // Get the thread id \r
144   std::thread::id my_id = std::this_thread::get_id();\r
145   std::stringstream thread_id;\r
146   thread_id << my_id;\r
147 \r
148   // Stats counters \r
149   unsigned long recvs = 0;\r
150   unsigned long attempts = 0;\r
151   unsigned long fails = 0;\r
152  \r
153   // Get the rmr context \r
154   void *rmr_context = parent->get_rmr_context(); \r
155   if (!rmr_context){\r
156     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
157     std::string error_string = identifier +  " Thread : " + thread_id.str() + " Listener cannot run : no context available";\r
158     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
159     throw error_string;\r
160   }\r
161 \r
162   // Get buffer specific to this thread\r
163   rmr_mbuf_t *rmr_message;\r
164   if ( (rmr_message = rmr_alloc_msg(rmr_context, RMR_BUFFER_SIZE)) == NULL){\r
165     // throw exception here ..\r
166     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
167     std::string reason = strerror(errno);\r
168     std::string error_string = identifier + " Thread: " + thread_id.str() + " Error getting a buffer : " + reason;\r
169     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
170     throw error_string;\r
171   }\r
172   \r
173   int num_retries = this->get_num_retries();\r
174   \r
175   mdclog_write(MDCLOG_INFO, "Starting thread %s",  thread_id.str().c_str());\r
176   \r
177   while(parent->_isRunning()){\r
178     \r
179       rmr_message = rmr_torcv_msg(rmr_context, rmr_message, RMR_TIMEOUT);\r
180       //rmr_message = rmr_rcv_msg(rmr_context, rmr_message);\r
181   \r
182     if (rmr_message && rmr_message->state == RMR_OK){\r
183       recvs++;\r
184       bool res = msg_fn(c,rmr_message);\r
185       }\r
186       \r
187     }\r
188 \r
189   // Clean up \r
190   try{\r
191     rmr_free_msg(rmr_message);\r
192   }\r
193   catch(std::runtime_error &e){\r
194     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
195     std::string error_string = identifier = " Error freeing RMR message ";\r
196     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
197   }\r
198   \r
199   mdclog_write(MDCLOG_INFO, "Finished  thread %s : Recv = %lu, Tx Attempts = %lu, Tx Fail = %lu", thread_id.str().c_str(), recvs, attempts, fails);\r
200 }\r
201 \r
202 \r
203 template <typename messageProcessor>\r
204 void XaPP::Start(messageProcessor && msg_fn){\r
205 \r
206   std::lock_guard<std::mutex> guard(*_transmit);\r
207   _listen = true;\r
208 \r
209   // Spin up the the workThreads ..... \r
210   for(unsigned int i = 0; i < _num_threads; i++){\r
211     thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));\r
212   }\r
213   \r
214 };\r
215 \r
216 // template if calling function provides an error handler also\r
217 template <typename messageProcessor, typename errorHandler>\r
218 void XaPP::Start(messageProcessor && msg_fn, errorHandler && error_handler){\r
219 \r
220   std::lock_guard<std::mutex> guard(*_transmit);\r
221   _listen = true;\r
222 \r
223   // Spin up the the workThreads ..... \r
224   for(unsigned int i = 0; i < _num_threads; i++){\r
225     //std::cout << "Starting thread number " << i << std::endl;\r
226     thread_group.insert(std::make_pair(i, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));\r
227 \r
228   }\r
229 \r
230   \r
231 };\r
232 \r
233 // Template to allow a user defined processor to start\r
234 // on a specific thread \r
235 template <typename messageProcessor>\r
236 unsigned int XaPP::StartThread(messageProcessor && msg_fn){\r
237 \r
238   std::lock_guard<std::mutex> guard(*_transmit);\r
239   _listen = true;\r
240 \r
241   _num_threads++;\r
242   thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, std::bind(&XaPP::_error_handler, this, std::placeholders::_1), this);}))));\r
243   return _num_threads;\r
244 \r
245 };\r
246 \r
247 \r
248 // Template to allow a user defined processor and error handler to start\r
249 // on a specific thread \r
250 template <typename messageProcessor, typename  errorHandler>\r
251 unsigned int XaPP::StartThread(messageProcessor && msg_fn, errorHandler && error_handler){\r
252 \r
253   std::lock_guard<std::mutex> guard(*_transmit);\r
254   _listen = true;\r
255 \r
256   _num_threads++;\r
257   thread_group.insert(std::make_pair(_num_threads, std::thread( ([&](){_workThread(msg_fn, error_handler, this);}))));\r
258   return _num_threads;\r
259 \r
260 };\r
261 \r
262 \r
263 #endif\r