Updated INFO.yaml file
[ric-app/kpimon.git] / src / xapp_utils.cc
1 /*\r
2 ==================================================================================\r
3 \r
4         Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property.\r
5 \r
6    Licensed under the Apache License, Version 2.0 (the "License");\r
7    you may not use this file except in compliance with the License.\r
8    You may obtain a copy of the License at\r
9 \r
10        http://www.apache.org/licenses/LICENSE-2.0\r
11 \r
12    Unless required by applicable law or agreed to in writing, software\r
13    distributed under the License is distributed on an "AS IS" BASIS,\r
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
15    See the License for the specific language governing permissions and\r
16    limitations under the License.\r
17 ==================================================================================\r
18 */\r
19 \r
20 #include "xapp_utils.hpp"\r
21 \r
22 // Constructor that automatically determines number of threads\r
23 XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){\r
24 \r
25   _num_threads = std::thread::hardware_concurrency();\r
26   strcpy(_xapp_name, xapp_name);\r
27   strcpy(_proto_port, proto_port);\r
28   _redis_port = redis_port;\r
29   init(_msg_size);\r
30   get_routes();\r
31   redisInit();\r
32 };\r
33   \r
34 \r
35 \r
36 // Constructor that takes number of threads as argument \r
37 XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2),  _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) {\r
38 \r
39   strcpy(_xapp_name, xapp_name);\r
40   strcpy(_proto_port, proto_port);\r
41   _redis_port = redis_port;\r
42   init(_msg_size);\r
43   get_routes();\r
44   redisInit();\r
45 };\r
46   \r
47 // Destructor closes rmr context if available\r
48 XaPP::~XaPP(void){\r
49 \r
50   // Call stop to clear thread table\r
51   Stop();\r
52 \r
53   // free memory\r
54   rmr_free_msg(_rmr_tx_message);\r
55   \r
56   if (_rmr_ctx){\r
57     rmr_close(_rmr_ctx);\r
58   }\r
59   // delete mutex\r
60   delete _transmit;\r
61   \r
62   /* Disconnects and frees the context */\r
63   redisFree(c);\r
64 };\r
65 \r
66 // Get the RMR context \r
67 void XaPP::init(int msg_size){\r
68 \r
69   if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){\r
70     std::stringstream ss;\r
71     ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size  " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;\r
72     ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;\r
73     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
74     throw ss.str();\r
75   }\r
76   \r
77   // Initialze the rmr context\r
78   if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){\r
79     // throw exception here ..\r
80     std::stringstream ss;\r
81     ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl;\r
82     mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
83     throw ss.str();\r
84   }\r
85   \r
86 }\r
87 \r
88 // Get the Redis Context\r
89 void XaPP::redisInit(){\r
90 \r
91     // Initialize the Redis Context\r
92     c = redisConnect("127.0.0.1", _redis_port);\r
93     if (c == NULL || c->err) {\r
94         std::stringstream ss;\r
95         if (c) {\r
96             ss << "Error: " << c->errstr;\r
97             // handle error\r
98         } else {\r
99             ss <<"Can't allocate redis context";\r
100         }\r
101         mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
102         throw ss.str();\r
103     }\r
104 }\r
105 \r
106 \r
107 // Blocks till routing table is received\r
108 void XaPP::get_routes(void){\r
109   \r
110   // Wait for getting a routing table ...\r
111   int i = 0;\r
112   _is_ready = 0;\r
113   while( i < MAX_WAIT_TIME){\r
114     std::cout <<"Waiting for RMR to be ready " << std::endl;\r
115     if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){\r
116       break;\r
117     }\r
118     sleep(1);\r
119     i++;\r
120   };\r
121 \r
122   if(!_is_ready){\r
123     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
124     std::string error_string = identifier + " Error getting routing table";\r
125     throw std::runtime_error(error_string);\r
126   }\r
127   \r
128 \r
129   // Get a tx buffer in case we need to do a transmit from the main thread itself\r
130   if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){\r
131     // throw exception here ..\r
132     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
133     std::string error_string = identifier + " Error getting a send buffer";\r
134     throw std::runtime_error(error_string);\r
135   }\r
136 \r
137   std::cout <<"Route Table received. Send buffer allocated" << std::endl;\r
138   _transmit = new std::mutex();\r
139 \r
140 }\r
141 \r
142 // Send method that takes TLV (type/length/value) input\r
143 bool XaPP::Send(int type,  int payload_len, void *payload){\r
144 \r
145   if (likely(_is_ready)){\r
146     if (likely(payload_len <= RMR_BUFFER_SIZE)){\r
147       _rmr_tx_message->mtype  = type;\r
148       memcpy(_rmr_tx_message->payload, payload, payload_len);\r
149       _rmr_tx_message->len = payload_len;\r
150       return Send(_rmr_tx_message);\r
151     }\r
152     else{\r
153        std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
154        std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE);\r
155        mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
156     }\r
157   }\r
158   else{\r
159     std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
160     std::string error_string = identifier + " rmr not ready to send";\r
161     mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
162   }\r
163   \r
164   return false;\r
165 }\r
166 \r
167 // Send method that takes TLV (type/length/value) input + MEID\r
168 bool XaPP::Send(int type,  int payload_len, void *payload, unsigned char const * meid){\r
169   if (!_is_ready){\r
170     return false;\r
171   }\r
172 \r
173   _rmr_tx_message->mtype  = type;  \r
174   memcpy(_rmr_tx_message->payload, payload, payload_len);\r
175   _rmr_tx_message->len = payload_len;\r
176   rmr_str2meid(_rmr_tx_message, meid);\r
177   return Send(_rmr_tx_message);\r
178   \r
179 }\r
180 \r
181 \r
182 // Send method that takes a buffer\r
183 bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){\r
184 \r
185   if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE  && rmr_tx_message->len > 0)){\r
186     int i = 0;\r
187     rmr_tx_message->sub_id = RMR_VOID_SUBID;\r
188     while(i <= _num_retries){\r
189       \r
190       //rmr_tx_message->state = 0; // fix for nng\r
191       rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);\r
192       _num_attempts ++;\r
193       \r
194       if (! rmr_tx_message){\r
195         // CRITICAL EROR .. log it \r
196         std::string identifier = __FILE__ +  std::string(", Line: ") + std::to_string(__LINE__) ; \r
197         std::string error_string = identifier + " rmr_send returned NULL";\r
198         mdclog_write(MDCLOG_ERR, error_string.c_str(), "");\r
199         return false;\r
200       }\r
201       else if (rmr_tx_message->state == RMR_OK){\r
202         return true;\r
203       }\r
204       else  if(rmr_tx_message->state == RMR_ERR_RETRY){\r
205         i++;\r
206         _num_fails++;\r
207       }\r
208       else {\r
209         mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));\r
210         return false;\r
211       }\r
212     \r
213     }\r
214   }\r
215   else{\r
216     mdclog_write(MDCLOG_ERR, "Error: %s, %d  Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);\r
217     return false;\r
218   }\r
219   \r
220   std::stringstream ss;\r
221   ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);\r
222   mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");\r
223   \r
224   return false;\r
225 }\r
226 \r
227 \r
228 void XaPP::Stop(void){\r
229   // Get the mutex lock \r
230   std::lock_guard<std::mutex> guard(*_transmit);\r
231   _listen = false;\r
232 \r
233   //Wait for all threads to stop\r
234   for(auto & t: thread_group){\r
235     std::thread::id my_id = t.second.get_id();\r
236     std::stringstream thread_id;\r
237     thread_id << my_id;\r
238     t.second.join();\r
239 \r
240   }\r
241 \r
242   // Clear thread table ...\r
243   thread_group.clear();\r
244   \r
245 }\r
246 \r
247 // default error handler if non specified by user\r
248 // pass through for now\r
249 void XaPP::_error_handler(rmr_mbuf_t *message){\r
250 };\r
251 \r
252 \r
253 \r
254 \r
255 //----------------------------------------\r
256 // Some get/set methods\r
257 //---------------------------------------\r
258 \r
259 std::string XaPP::getName(void){\r
260   return std::string(_xapp_name);\r
261 }\r
262 \r
263 int XaPP::getStatus(void){\r
264   return _is_ready;\r
265 }\r
266 \r
267 bool XaPP::_isRunning(void){\r
268   return _listen;\r
269 }\r
270 \r
271 \r
272 void * XaPP::get_rmr_context(void){\r
273   return _rmr_ctx;\r
274 }\r
275 \r
276 void XaPP::set_num_retries(int num_retries){\r
277   if (num_retries < 0 || num_retries > MAX_RETRIES){\r
278     throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n";\r
279   }\r
280   \r
281   _num_retries = num_retries;\r
282 }\r
283 \r
284 int XaPP::get_num_retries(void){\r
285   return _num_retries;\r
286 }\r
287 \r
288 \r
289 unsigned long  XaPP::get_Send_attempts(void){\r
290   return _num_attempts;\r
291 };\r
292 \r
293 \r
294 unsigned long XaPP::get_Send_fails(void){\r
295   return _num_fails;\r
296 };\r
297 \r
298 // Initialization of Log level\r
299 void init_logger(const char  *AppName, mdclog_severity_t log_level)\r
300 {\r
301     mdclog_attr_t *attr;\r
302     mdclog_attr_init(&attr);\r
303     mdclog_attr_set_ident(attr, AppName);\r
304     mdclog_init(attr);\r
305     mdclog_level_set(log_level);\r
306     mdclog_attr_destroy(attr);\r
307 }\r