2 ==================================================================================
\r
4 Copyright (c) 2018-2019 SAMSUNG and AT&T Intellectual Property.
\r
6 Licensed under the Apache License, Version 2.0 (the "License");
\r
7 you may not use this file except in compliance with the License.
\r
8 You may obtain a copy of the License at
\r
10 http://www.apache.org/licenses/LICENSE-2.0
\r
12 Unless required by applicable law or agreed to in writing, software
\r
13 distributed under the License is distributed on an "AS IS" BASIS,
\r
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
15 See the License for the specific language governing permissions and
\r
16 limitations under the License.
\r
17 ==================================================================================
\r
20 #include "xapp_utils.hpp"
\r
22 // Constructor that automatically determines number of threads
\r
23 XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_attempts(0), _num_fails(0){
\r
25 _num_threads = std::thread::hardware_concurrency();
\r
26 strcpy(_xapp_name, xapp_name);
\r
27 strcpy(_proto_port, proto_port);
\r
28 _redis_port = redis_port;
\r
36 // Constructor that takes number of threads as argument
\r
37 XaPP::XaPP(char *xapp_name, char *proto_port, int redis_port, int msg_size, int num_threads): _is_ready(0), _listen(false), _num_retries(2), _msg_size(msg_size), _num_threads(num_threads),_num_attempts(0), _num_fails(0) {
\r
39 strcpy(_xapp_name, xapp_name);
\r
40 strcpy(_proto_port, proto_port);
\r
41 _redis_port = redis_port;
\r
47 // Destructor closes rmr context if available
\r
50 // Call stop to clear thread table
\r
54 rmr_free_msg(_rmr_tx_message);
\r
57 rmr_close(_rmr_ctx);
\r
62 /* Disconnects and frees the context */
\r
66 // Get the RMR context
\r
67 void XaPP::init(int msg_size){
\r
69 if (msg_size > RMR_BUFFER_SIZE or msg_size <= 0){
\r
70 std::stringstream ss;
\r
71 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Invalid buffer size " << msg_size << " for RMR initialization context. Must be between " << 1 << " and " << RMR_BUFFER_SIZE << std::endl;
\r
72 ss << " To change max buffer requested, update RMR_BUFFER_SIZE " << std::endl;
\r
73 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
\r
77 // Initialze the rmr context
\r
78 if ( (_rmr_ctx = rmr_init(_proto_port, msg_size, RMRFL_NONE)) == NULL){
\r
79 // throw exception here ..
\r
80 std::stringstream ss;
\r
81 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Error initiatilizing RMR context for " << _xapp_name << " on port " << _proto_port << " Reason = " << strerror(errno) << std::endl;
\r
82 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
\r
88 // Get the Redis Context
\r
89 void XaPP::redisInit(){
\r
91 // Initialize the Redis Context
\r
92 c = redisConnect("127.0.0.1", _redis_port);
\r
93 if (c == NULL || c->err) {
\r
94 std::stringstream ss;
\r
96 ss << "Error: " << c->errstr;
\r
99 ss <<"Can't allocate redis context";
\r
101 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
\r
107 // Blocks till routing table is received
\r
108 void XaPP::get_routes(void){
\r
110 // Wait for getting a routing table ...
\r
113 while( i < MAX_WAIT_TIME){
\r
114 std::cout <<"Waiting for RMR to be ready " << std::endl;
\r
115 if ((_is_ready = rmr_ready(_rmr_ctx)) == 1){
\r
123 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
124 std::string error_string = identifier + " Error getting routing table";
\r
125 throw std::runtime_error(error_string);
\r
129 // Get a tx buffer in case we need to do a transmit from the main thread itself
\r
130 if ( (_rmr_tx_message = rmr_alloc_msg(_rmr_ctx, RMR_BUFFER_SIZE)) == NULL){
\r
131 // throw exception here ..
\r
132 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
133 std::string error_string = identifier + " Error getting a send buffer";
\r
134 throw std::runtime_error(error_string);
\r
137 std::cout <<"Route Table received. Send buffer allocated" << std::endl;
\r
138 _transmit = new std::mutex();
\r
142 // Send method that takes TLV (type/length/value) input
\r
143 bool XaPP::Send(int type, int payload_len, void *payload){
\r
145 if (likely(_is_ready)){
\r
146 if (likely(payload_len <= RMR_BUFFER_SIZE)){
\r
147 _rmr_tx_message->mtype = type;
\r
148 memcpy(_rmr_tx_message->payload, payload, payload_len);
\r
149 _rmr_tx_message->len = payload_len;
\r
150 return Send(_rmr_tx_message);
\r
153 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
154 std::string error_string = identifier + " message payload len " + std::to_string(payload_len) + " exceeds maximum buffer size " + std::to_string(RMR_BUFFER_SIZE);
\r
155 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
\r
159 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
160 std::string error_string = identifier + " rmr not ready to send";
\r
161 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
\r
167 // Send method that takes TLV (type/length/value) input + MEID
\r
168 bool XaPP::Send(int type, int payload_len, void *payload, unsigned char const * meid){
\r
173 _rmr_tx_message->mtype = type;
\r
174 memcpy(_rmr_tx_message->payload, payload, payload_len);
\r
175 _rmr_tx_message->len = payload_len;
\r
176 rmr_str2meid(_rmr_tx_message, meid);
\r
177 return Send(_rmr_tx_message);
\r
182 // Send method that takes a buffer
\r
183 bool XaPP::Send(rmr_mbuf_t * rmr_tx_message){
\r
185 if(likely(_is_ready && rmr_tx_message->len <= RMR_BUFFER_SIZE && rmr_tx_message->len > 0)){
\r
187 rmr_tx_message->sub_id = RMR_VOID_SUBID;
\r
188 while(i <= _num_retries){
\r
190 //rmr_tx_message->state = 0; // fix for nng
\r
191 rmr_tx_message = rmr_send_msg(_rmr_ctx, rmr_tx_message);
\r
194 if (! rmr_tx_message){
\r
195 // CRITICAL EROR .. log it
\r
196 std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ;
\r
197 std::string error_string = identifier + " rmr_send returned NULL";
\r
198 mdclog_write(MDCLOG_ERR, error_string.c_str(), "");
\r
201 else if (rmr_tx_message->state == RMR_OK){
\r
204 else if(rmr_tx_message->state == RMR_ERR_RETRY){
\r
209 mdclog_write(MDCLOG_ERR, "Error : %s, %d. Unable to transmit RMR message. RMR state = %d, %s\n", __FILE__, __LINE__, rmr_tx_message->state, strerror(errno));
\r
216 mdclog_write(MDCLOG_ERR, "Error: %s, %d Invalid state/message for RMR tx. Ready = %d, Message len = %d\n", __FILE__, __LINE__, _is_ready, rmr_tx_message->len);
\r
220 std::stringstream ss;
\r
221 ss << "Error ::" << __FILE__ << "," << __LINE__ << " Could not send message of type " << rmr_tx_message->mtype << " and size " << rmr_tx_message->len << ". Reason = " << strerror(errno);
\r
222 mdclog_write(MDCLOG_ERR, ss.str().c_str(), "");
\r
228 void XaPP::Stop(void){
\r
229 // Get the mutex lock
\r
230 std::lock_guard<std::mutex> guard(*_transmit);
\r
233 //Wait for all threads to stop
\r
234 for(auto & t: thread_group){
\r
235 std::thread::id my_id = t.second.get_id();
\r
236 std::stringstream thread_id;
\r
237 thread_id << my_id;
\r
242 // Clear thread table ...
\r
243 thread_group.clear();
\r
247 // default error handler if non specified by user
\r
248 // pass through for now
\r
249 void XaPP::_error_handler(rmr_mbuf_t *message){
\r
255 //----------------------------------------
\r
256 // Some get/set methods
\r
257 //---------------------------------------
\r
259 std::string XaPP::getName(void){
\r
260 return std::string(_xapp_name);
\r
263 int XaPP::getStatus(void){
\r
267 bool XaPP::_isRunning(void){
\r
272 void * XaPP::get_rmr_context(void){
\r
276 void XaPP::set_num_retries(int num_retries){
\r
277 if (num_retries < 0 || num_retries > MAX_RETRIES){
\r
278 throw "[xapp_utils] : Illegal value of num_retries. Must be positive integer between 0 and MAX_RETRIES\n";
\r
281 _num_retries = num_retries;
\r
284 int XaPP::get_num_retries(void){
\r
285 return _num_retries;
\r
289 unsigned long XaPP::get_Send_attempts(void){
\r
290 return _num_attempts;
\r
294 unsigned long XaPP::get_Send_fails(void){
\r
298 // Initialization of Log level
\r
299 void init_logger(const char *AppName, mdclog_severity_t log_level)
\r
301 mdclog_attr_t *attr;
\r
302 mdclog_attr_init(&attr);
\r
303 mdclog_attr_set_ident(attr, AppName);
\r
305 mdclog_level_set(log_level);
\r
306 mdclog_attr_destroy(attr);
\r